Merge pull request #10734 from imbeacon/improvement/mqtt-reason-codes

Added MQTT Disconnect messages from server with reason codes
This commit is contained in:
Andrew Shvayka 2024-05-24 16:59:46 +03:00 committed by GitHub
commit 181f117c64
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 423 additions and 212 deletions

View File

@ -90,6 +90,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCre
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
import org.thingsboard.server.gen.transport.TransportProtos.UplinkNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SessionCloseReason;
import org.thingsboard.server.service.rpc.RpcSubmitStrategy;
import org.thingsboard.server.service.state.DefaultDeviceStateService;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
@ -845,7 +846,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
notifyTransportAboutDeviceCredentialsUpdate(k, v, ((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials());
});
} else {
sessions.forEach((sessionId, sessionMd) -> notifyTransportAboutClosedSession(sessionId, sessionMd, "device credentials updated!"));
sessions.forEach((sessionId, sessionMd) -> notifyTransportAboutClosedSession(sessionId, sessionMd, "device credentials updated!", SessionCloseReason.CREDENTIALS_UPDATED));
attributeSubscriptions.clear();
rpcSubscriptions.clear();
dumpSessions();
@ -855,13 +856,15 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
private void notifyTransportAboutClosedSessionMaxSessionsLimit(UUID sessionId, SessionInfoMetaData sessionMd) {
log.debug("remove eldest session (max concurrent sessions limit reached per device) sessionId: [{}] sessionMd: [{}]", sessionId, sessionMd);
notifyTransportAboutClosedSession(sessionId, sessionMd, "max concurrent sessions limit reached per device!");
notifyTransportAboutClosedSession(sessionId, sessionMd, "max concurrent sessions limit reached per device!", SessionCloseReason.MAX_CONCURRENT_SESSIONS_LIMIT_REACHED);
}
private void notifyTransportAboutClosedSession(UUID sessionId, SessionInfoMetaData sessionMd, String message) {
private void notifyTransportAboutClosedSession(UUID sessionId, SessionInfoMetaData sessionMd, String message, SessionCloseReason reason) {
SessionCloseNotificationProto sessionCloseNotificationProto = SessionCloseNotificationProto
.newBuilder()
.setMessage(message).build();
.setMessage(message)
.setReason(reason)
.build();
ToTransportMsg msg = ToTransportMsg.newBuilder()
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
@ -1044,7 +1047,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
attributeSubscriptions.remove(id);
if (session != null) {
removed++;
notifyTransportAboutClosedSession(id, session, SESSION_TIMEOUT_MESSAGE);
notifyTransportAboutClosedSession(id, session, SESSION_TIMEOUT_MESSAGE, SessionCloseReason.SESSION_TIMEOUT);
}
}
if (removed != 0) {

View File

@ -972,6 +972,8 @@ transport:
proxy_enabled: "${MQTT_PROXY_PROTOCOL_ENABLED:false}"
# MQTT processing timeout in milliseconds
timeout: "${MQTT_TIMEOUT:10000}"
# MQTT disconnect timeout in milliseconds. The time to wait for the client to disconnect after the server sends a disconnect message.
disconnect_timeout: "${MQTT_DISCONNECT_TIMEOUT:1000}"
msg_queue_size_per_device_limit: "${MQTT_MSG_QUEUE_SIZE_PER_DEVICE_LIMIT:100}" # messages await in the queue before the device connected state. This limit works on the low level before TenantProfileLimits mechanism
netty:
# Netty leak detector level

View File

@ -466,6 +466,13 @@ public class JsonConverter {
return result;
}
public static JsonObject toGatewayDeviceDisconnectJson(String deviceName, int reasonCode) {
JsonObject result = new JsonObject();
result.addProperty(DEVICE_PROPERTY, deviceName);
result.addProperty("reason", reasonCode);
return result;
}
public static JsonElement toErrorJson(String errorMsg) {
JsonObject error = new JsonObject();
error.addProperty("error", errorMsg);

View File

@ -397,6 +397,11 @@ message GetOrCreateDeviceFromGatewayResponseMsg {
TransportApiRequestErrorCode error = 3;
}
message GatewayDisconnectDeviceMsg {
string deviceName = 1;
int32 reasonCode = 2;
}
enum TransportApiRequestErrorCode {
UNKNOWN_TRANSPORT_API_ERROR = 0;
ENTITY_LIMIT = 1;
@ -579,8 +584,16 @@ message ResourceDeleteMsg {
string resourceKey = 4;
}
enum SessionCloseReason {
UNKNOWN_REASON = 0;
CREDENTIALS_UPDATED = 1;
MAX_CONCURRENT_SESSIONS_LIMIT_REACHED = 2;
SESSION_TIMEOUT = 3;
}
message SessionCloseNotificationProto {
string message = 1;
SessionCloseReason reason = 2;
}
message SubscribeToAttributeUpdatesMsg {

View File

@ -71,6 +71,10 @@ public class MqttTransportContext extends TransportContext {
@Value("${transport.mqtt.timeout:10000}")
private long timeout;
@Getter
@Value("${transport.mqtt.disconnect_timeout:1000}")
private long disconnectTimeout;
@Getter
@Value("${transport.mqtt.proxy_enabled:false}")
private boolean proxyEnabled;

View File

@ -31,6 +31,7 @@ import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttReasonCodes;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
@ -68,7 +69,6 @@ import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.auth.SessionInfoCreator;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.common.transport.service.DefaultTransportService;
import org.thingsboard.server.common.transport.service.SessionMetaData;
import org.thingsboard.server.common.transport.util.SslUtil;
import org.thingsboard.server.gen.transport.TransportProtos;
@ -82,7 +82,6 @@ import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler;
import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher;
import org.thingsboard.server.transport.mqtt.session.SparkplugNodeSessionHandler;
import org.thingsboard.server.transport.mqtt.util.ReturnCode;
import org.thingsboard.server.transport.mqtt.util.ReturnCodeResolver;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugRpcRequestHeader;
@ -196,23 +195,53 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
processMqttMsg(ctx, message);
} else {
log.error("[{}] Message decoding failed: {}", sessionId, message.decoderResult().cause().getMessage());
closeCtx(ctx);
closeCtx(ctx, MqttReasonCodes.Disconnect.MALFORMED_PACKET);
}
} else {
log.debug("[{}] Received non mqtt message: {}", sessionId, msg.getClass().getSimpleName());
closeCtx(ctx);
closeCtx(ctx, (MqttMessage) null);
}
} finally {
ReferenceCountUtil.safeRelease(msg);
}
}
private void closeCtx(ChannelHandlerContext ctx) {
private void closeCtx(ChannelHandlerContext ctx, MqttReasonCodes.Disconnect returnCode) {
closeCtx(ctx, returnCode.byteValue());
}
private void closeCtx(ChannelHandlerContext ctx, MqttConnectReturnCode returnCode) {
closeCtx(ctx, ReturnCodeResolver.getConnectionReturnCode(deviceSessionCtx.getMqttVersion(), returnCode).byteValue());
}
private void closeCtx(ChannelHandlerContext ctx, byte returnCode) {
closeCtx(ctx, createMqttDisconnectMsg(deviceSessionCtx, returnCode));
}
private void closeCtx(ChannelHandlerContext ctx, MqttMessage msg) {
if (!rpcAwaitingAck.isEmpty()) {
log.debug("[{}] Cleanup RPC awaiting ack map due to session close!", sessionId);
rpcAwaitingAck.clear();
}
ctx.close();
if (ctx.channel() == null) {
log.debug("[{}] Channel is null, closing ctx...", sessionId);
ctx.close();
} else if (ctx.channel().isOpen()) {
if (msg != null && MqttVersion.MQTT_5 == deviceSessionCtx.getMqttVersion()) {
ChannelFuture channelFuture = ctx.writeAndFlush(msg).addListener(future -> ctx.close());
scheduler.schedule(() -> {
if (!channelFuture.isDone()) {
log.debug("[{}] Closing channel due to timeout!", sessionId);
ctx.close();
}
}, context.getDisconnectTimeout(), TimeUnit.MILLISECONDS);
} else {
ctx.close();
}
} else {
log.debug("[{}] Channel is already closed!", sessionId);
}
}
InetSocketAddress getAddress(ChannelHandlerContext ctx) {
@ -231,7 +260,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
if (msg.fixedHeader() == null) {
log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());
closeCtx(ctx);
closeCtx(ctx, MqttReasonCodes.Disconnect.PROTOCOL_ERROR);
return;
}
deviceSessionCtx.setChannel(ctx);
@ -268,21 +297,23 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
} else {
log.debug("[{}] Unsupported topic for provisioning requests: {}!", sessionId, topicName);
closeCtx(ctx);
ack(ctx, msgId, MqttReasonCodes.PubAck.TOPIC_NAME_INVALID);
closeCtx(ctx, MqttReasonCodes.Disconnect.TOPIC_NAME_INVALID);
}
} catch (RuntimeException e) {
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
closeCtx(ctx);
ack(ctx, msgId, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR);
closeCtx(ctx, MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR);
} catch (AdaptorException e) {
log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
closeCtx(ctx);
sendResponseForAdaptorErrorOrCloseContext(ctx, topicName, msgId);
}
break;
case PINGREQ:
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
break;
case DISCONNECT:
closeCtx(ctx);
closeCtx(ctx, MqttReasonCodes.Disconnect.NORMAL_DISCONNECT);
break;
}
}
@ -292,7 +323,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (queueSize >= context.getMessageQueueSizePerDeviceLimit()) {
log.info("Closing current session because msq queue size for device {} exceed limit {} with msgQueueSize counter {} and actual queue size {}",
deviceSessionCtx.getDeviceId(), context.getMessageQueueSizePerDeviceLimit(), queueSize, deviceSessionCtx.getMsgQueueSize());
closeCtx(ctx);
closeCtx(ctx, MqttReasonCodes.Disconnect.QUOTA_EXCEEDED);
return;
}
@ -329,7 +360,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
break;
case DISCONNECT:
closeCtx(ctx);
closeCtx(ctx, MqttReasonCodes.Disconnect.NORMAL_DISCONNECT);
break;
case PUBACK:
int msgId = ((MqttPubAckMessage) msg).variableHeader().messageId();
@ -389,15 +420,15 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
gatewaySessionHandler.onDeviceDisconnect(mqttMsg);
break;
default:
ack(ctx, msgId, ReturnCode.TOPIC_NAME_INVALID);
ack(ctx, msgId, MqttReasonCodes.PubAck.TOPIC_NAME_INVALID);
}
} catch (RuntimeException e) {
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
ack(ctx, msgId, ReturnCode.IMPLEMENTATION_SPECIFIC);
closeCtx(ctx);
ack(ctx, msgId, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR);
closeCtx(ctx, MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR);
} catch (AdaptorException e) {
log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
sendAckOrCloseSession(ctx, topicName, msgId);
sendResponseForAdaptorErrorOrCloseContext(ctx, topicName, msgId);
}
}
@ -433,11 +464,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
} catch (RuntimeException e) {
log.error("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
ack(ctx, msgId, ReturnCode.IMPLEMENTATION_SPECIFIC);
closeCtx(ctx);
ack(ctx, msgId, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR);
closeCtx(ctx, MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR);
} catch (AdaptorException | ThingsboardException | InvalidProtocolBufferException e) {
log.error("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
sendAckOrCloseSession(ctx, topicName, msgId);
sendResponseForAdaptorErrorOrCloseContext(ctx, topicName, msgId);
}
}
@ -530,11 +561,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
attrReqTopicType = TopicType.V2;
} else {
transportService.recordActivity(deviceSessionCtx.getSessionInfo());
ack(ctx, msgId, ReturnCode.TOPIC_NAME_INVALID);
ack(ctx, msgId, MqttReasonCodes.PubAck.TOPIC_NAME_INVALID);
}
} catch (AdaptorException e) {
log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
sendAckOrCloseSession(ctx, topicName, msgId);
sendResponseForAdaptorErrorOrCloseContext(ctx, topicName, msgId);
}
}
@ -548,13 +579,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
private void sendAckOrCloseSession(ChannelHandlerContext ctx, String topicName, int msgId) {
private void sendResponseForAdaptorErrorOrCloseContext(ChannelHandlerContext ctx, String topicName, int msgId) {
if ((deviceSessionCtx.isSendAckOnValidationException() || MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) && msgId > 0) {
log.debug("[{}] Send pub ack on invalid publish msg [{}][{}]", sessionId, topicName, msgId);
ctx.writeAndFlush(createMqttPubAckMsg(deviceSessionCtx, msgId, ReturnCode.PAYLOAD_FORMAT_INVALID));
ctx.writeAndFlush(createMqttPubAckMsg(deviceSessionCtx, msgId, MqttReasonCodes.PubAck.PAYLOAD_FORMAT_INVALID.byteValue()));
} else {
log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
closeCtx(ctx);
closeCtx(ctx, MqttReasonCodes.Disconnect.PAYLOAD_FORMAT_INVALID);
}
}
@ -593,7 +624,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
private void ack(ChannelHandlerContext ctx, int msgId, ReturnCode returnCode) {
private void ack(ChannelHandlerContext ctx, int msgId, MqttReasonCodes.PubAck returnCode) {
ack(ctx, msgId, returnCode.byteValue());
}
private void ack(ChannelHandlerContext ctx, int msgId, byte returnCode) {
if (msgId > 0) {
ctx.writeAndFlush(createMqttPubAckMsg(deviceSessionCtx, msgId, returnCode));
}
@ -604,13 +639,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onSuccess(Void dummy) {
log.trace("[{}] Published msg: {}", sessionId, msg);
ack(ctx, msgId, ReturnCode.SUCCESS);
ack(ctx, msgId, MqttReasonCodes.PubAck.SUCCESS);
}
@Override
public void onError(Throwable e) {
log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);
closeCtx(ctx);
closeCtx(ctx, MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR);
}
};
}
@ -629,7 +664,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onSuccess(TransportProtos.ProvisionDeviceResponseMsg provisionResponseMsg) {
log.trace("[{}] Published msg: {}", sessionId, msg);
ack(ctx, msgId, ReturnCode.SUCCESS);
ack(ctx, msgId, MqttReasonCodes.PubAck.SUCCESS);
try {
if (deviceSessionCtx.getProvisionPayloadType().equals(TransportPayloadType.JSON)) {
deviceSessionCtx.getContext().getJsonMqttAdaptor().convertToPublish(deviceSessionCtx, provisionResponseMsg).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
@ -645,8 +680,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onError(Throwable e) {
log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);
ack(ctx, msgId, ReturnCode.IMPLEMENTATION_SPECIFIC);
closeCtx(ctx);
ack(ctx, msgId, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR);
closeCtx(ctx, MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR);
}
}
@ -681,13 +716,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onError(Throwable e) {
log.trace("[{}] Failed to get firmware: {}", sessionId, msg, e);
closeCtx(ctx);
closeCtx(ctx, MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR);
}
}
private void sendOtaPackage(ChannelHandlerContext ctx, int msgId, String firmwareId, String requestId, int chunkSize, int chunk, OtaPackageType type) {
log.trace("[{}] Send firmware [{}] to device!", sessionId, firmwareId);
ack(ctx, msgId, ReturnCode.SUCCESS);
ack(ctx, msgId, MqttReasonCodes.PubAck.SUCCESS);
try {
byte[] firmwareChunk = context.getOtaPackageDataCache().get(firmwareId, chunkSize, chunk);
deviceSessionCtx.getPayloadAdaptor()
@ -703,13 +738,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
deviceSessionCtx.getChannel().writeAndFlush(deviceSessionCtx
.getPayloadAdaptor()
.createMqttPublishMsg(deviceSessionCtx, MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC, error.getBytes()));
closeCtx(ctx);
closeCtx(ctx, MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR);
}
private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
if (!checkConnected(ctx, mqttMsg)) {
int returnCode = ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), ReturnCode.NOT_AUTHORIZED_5);
ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), Collections.singletonList(returnCode)));
ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), Collections.singletonList(MqttReasonCodes.SubAck.NOT_AUTHORIZED.byteValue() & 0xFF)));
return;
}
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
@ -718,7 +752,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
String topic = subscription.topicName();
MqttQoS reqQoS = subscription.qualityOfService();
if (deviceSessionCtx.isDeviceSubscriptionAttributesTopic(topic)){
if (deviceSessionCtx.isDeviceSubscriptionAttributesTopic(topic)) {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1);
activityReported = true;
continue;
@ -789,13 +823,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
break;
default:
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), ReturnCode.TOPIC_FILTER_INVALID));
grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID));
break;
}
}
} catch (Exception e) {
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS, e);
grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), ReturnCode.IMPLEMENTATION_SPECIFIC));
grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.IMPLEMENTATION_SPECIFIC_ERROR));
}
}
if (!activityReported) {
@ -832,7 +866,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
if (!checkConnected(ctx, mqttMsg)) {
ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId(), Collections.singletonList(ReturnCode.NOT_AUTHORIZED_5.shortValue())));
ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId(),
Collections.singletonList((short) MqttReasonCodes.UnsubAck.NOT_AUTHORIZED.byteValue())));
return;
}
boolean activityReported = false;
@ -843,7 +878,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (mqttQoSMap.containsKey(matcher)) {
mqttQoSMap.remove(matcher);
try {
short resultValue = ReturnCode.SUCCESS.shortValue();
short resultValue = MqttReasonCodes.UnsubAck.SUCCESS.byteValue();
switch (topicName) {
case MqttTopics.DEVICE_ATTRIBUTES_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC:
@ -884,16 +919,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
default:
log.trace("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
resultValue = ReturnCode.TOPIC_FILTER_INVALID.shortValue();
resultValue = MqttReasonCodes.UnsubAck.TOPIC_FILTER_INVALID.byteValue();
}
unSubResults.add(resultValue);
} catch (Exception e) {
log.debug("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
unSubResults.add(ReturnCode.IMPLEMENTATION_SPECIFIC.shortValue());
unSubResults.add((short) MqttReasonCodes.UnsubAck.IMPLEMENTATION_SPECIFIC_ERROR.byteValue());
}
} else {
log.debug("[{}] Failed to process unsubscription [{}] to [{}] - Subscription not found", sessionId, mqttMsg.variableHeader().messageId(), topicName);
unSubResults.add(ReturnCode.NO_SUBSCRIPTION_EXISTED.shortValue());
unSubResults.add((short)MqttReasonCodes.UnsubAck.NO_SUBSCRIPTION_EXISTED.byteValue());
}
}
if (!activityReported) {
@ -918,7 +953,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
deviceSessionCtx.setMqttVersion(getMqttVersion(msg.variableHeader().version()));
if (DataConstants.PROVISION.equals(userName) || DataConstants.PROVISION.equals(clientId)) {
deviceSessionCtx.setProvisionOnly(true);
ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SUCCESS, msg));
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_ACCEPTED, msg));
} else {
X509Certificate cert;
if (sslHandler != null && (cert = getX509Certificate()) != null) {
@ -952,8 +987,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onError(Throwable e) {
log.trace("[{}] Failed to process credentials: {}", address, userName, e);
ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage));
closeCtx(ctx);
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE_5, connectMessage));
closeCtx(ctx, MqttReasonCodes.Disconnect.SERVER_BUSY);
}
});
}
@ -975,15 +1010,15 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onError(Throwable e) {
log.trace("[{}] Failed to process credentials: {}", address, sha3Hash, e);
ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage));
closeCtx(ctx);
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE_5, connectMessage));
closeCtx(ctx, MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR);
}
});
} catch (Exception e) {
context.onAuthFailure(address);
ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.NOT_AUTHORIZED_5, connectMessage));
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED_5, connectMessage));
log.trace("[{}] X509 auth failure: {}", sessionId, address, e);
closeCtx(ctx);
closeCtx(ctx, MqttReasonCodes.Disconnect.NOT_AUTHORIZED);
}
}
@ -1000,7 +1035,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
return null;
}
private MqttConnAckMessage createMqttConnAckMsg(ReturnCode returnCode, MqttConnectMessage msg) {
private MqttConnAckMessage createMqttConnAckMsg(MqttConnectReturnCode returnCode, MqttConnectMessage msg) {
MqttMessageBuilders.ConnAckBuilder connAckBuilder = MqttMessageBuilders.connAck();
connAckBuilder.sessionPresent(!msg.variableHeader().isCleanSession());
MqttConnectReturnCode finalReturnCode = ReturnCodeResolver.getConnectionReturnCode(deviceSessionCtx.getMqttVersion(), returnCode);
@ -1031,18 +1066,18 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
log.error("[{}] Unexpected Exception", sessionId, cause);
}
closeCtx(ctx);
closeCtx(ctx, MqttReasonCodes.Disconnect.SERVER_SHUTTING_DOWN);
if (cause instanceof OutOfMemoryError) {
log.error("Received critical error. Going to shutdown the service.");
System.exit(1);
}
}
private static MqttSubAckMessage createSubAckMessage(Integer msgId, List<Integer> grantedQoSList) {
private static MqttSubAckMessage createSubAckMessage(Integer msgId, List<Integer> reasonCodes) {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(SUBACK, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(grantedQoSList);
MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(reasonCodes);
return new MqttSubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubAckPayload);
}
@ -1061,14 +1096,22 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
public static MqttMessage createMqttPubAckMsg(DeviceSessionCtx deviceSessionCtx, int requestId, ReturnCode returnCode) {
public static MqttMessage createMqttPubAckMsg(DeviceSessionCtx deviceSessionCtx, int requestId, byte returnCode) {
MqttMessageBuilders.PubAckBuilder pubAckMsgBuilder = MqttMessageBuilders.pubAck().packetId(requestId);
if (MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) {
pubAckMsgBuilder.reasonCode(returnCode.byteValue());
pubAckMsgBuilder.reasonCode(returnCode);
}
return pubAckMsgBuilder.build();
}
public static MqttMessage createMqttDisconnectMsg(DeviceSessionCtx deviceSessionCtx, byte returnCode) {
MqttMessageBuilders.DisconnectBuilder disconnectBuilder = MqttMessageBuilders.disconnect();
if (MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) {
disconnectBuilder.reasonCode(returnCode);
}
return disconnectBuilder.build();
}
private boolean checkConnected(ChannelHandlerContext ctx, MqttMessage msg) {
if (deviceSessionCtx.isConnected()) {
return true;
@ -1115,8 +1158,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
} catch (Exception e) {
log.trace("[{}][{}] Failed to fetch sparkplugDevice connect, sparkplugTopicName", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName(), e);
ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage));
closeCtx(ctx);
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE_5, connectMessage));
closeCtx(ctx, MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR);
}
}
@ -1160,20 +1203,20 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void onValidateDeviceResponse(ValidateDeviceCredentialsResponse msg, ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {
if (!msg.hasDeviceInfo()) {
context.onAuthFailure(address);
ReturnCode returnCode = ReturnCode.NOT_AUTHORIZED_5;
MqttConnectReturnCode returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED_5;
if (sslHandler == null || getX509Certificate() == null) {
String username = connectMessage.payload().userName();
byte[] passwordBytes = connectMessage.payload().passwordInBytes();
String clientId = connectMessage.payload().clientIdentifier();
if ((username != null && passwordBytes != null && clientId != null)
|| (username == null ^ passwordBytes == null)) {
returnCode = ReturnCode.BAD_USERNAME_OR_PASSWORD;
returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD;
} else if (!StringUtils.isBlank(clientId)) {
returnCode = ReturnCode.CLIENT_IDENTIFIER_NOT_VALID;
returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID;
}
}
ctx.writeAndFlush(createMqttConnAckMsg(returnCode, connectMessage));
closeCtx(ctx);
closeCtx(ctx, returnCode);
} else {
context.onAuthSuccess(address);
deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
@ -1188,7 +1231,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} else {
checkGatewaySession(sessionMetaData);
}
ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SUCCESS, connectMessage));
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_ACCEPTED, connectMessage));
deviceSessionCtx.setConnected(true);
log.debug("[{}] Client connected!", sessionId);
transportService.getCallbackExecutor().execute(() -> processMsgQueue(ctx)); //this callback will execute in Producer worker thread and hard or blocking work have to be submitted to the separate thread.
@ -1198,11 +1241,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void onError(Throwable e) {
if (e instanceof TbRateLimitsException) {
log.trace("[{}] Failed to submit session event: {}", sessionId, e.getMessage());
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_CONNECTION_RATE_EXCEEDED, connectMessage));
closeCtx(ctx, MqttReasonCodes.Disconnect.MESSAGE_RATE_TOO_HIGH);
} else {
log.warn("[{}] Failed to submit session event", sessionId, e);
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE_5, connectMessage));
closeCtx(ctx, MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR);
}
ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage));
closeCtx(ctx);
}
});
}
@ -1231,8 +1276,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
SparkplugTopic sparkplugTopic = new SparkplugTopic(sparkplugSessionHandler.getSparkplugTopicNode(),
SparkplugMessageType.NCMD);
sparkplugSessionHandler.createSparkplugMqttPublishMsg(tsKvProto,
sparkplugTopic.toString(),
sparkplugSessionHandler.getNodeBirthMetrics().get(tsKvProto.getKv().getKey()))
sparkplugTopic.toString(),
sparkplugSessionHandler.getNodeBirthMetrics().get(tsKvProto.getKv().getKey()))
.ifPresent(sparkplugSessionHandler::writeAndFlush);
}
});
@ -1250,7 +1295,19 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());
transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
closeCtx(deviceSessionCtx.getChannel());
MqttReasonCodes.Disconnect returnCode = MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR;
switch (sessionCloseNotification.getReason()) {
case CREDENTIALS_UPDATED:
returnCode = MqttReasonCodes.Disconnect.ADMINISTRATIVE_ACTION;
break;
case MAX_CONCURRENT_SESSIONS_LIMIT_REACHED:
returnCode = MqttReasonCodes.Disconnect.SESSION_TAKEN_OVER;
break;
case SESSION_TIMEOUT:
returnCode = MqttReasonCodes.Disconnect.MAXIMUM_CONNECT_TIME;
break;
}
closeCtx(deviceSessionCtx.getChannel(), returnCode);
}
@Override
@ -1287,8 +1344,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
SparkplugTopic sparkplugTopic = new SparkplugTopic(sparkplugSessionHandler.getSparkplugTopicNode(),
messageType);
sparkplugSessionHandler.createSparkplugMqttPublishMsg(tsKvProto,
sparkplugTopic.toString(),
sparkplugSessionHandler.getNodeBirthMetrics().get(tsKvProto.getKv().getKey()))
sparkplugTopic.toString(),
sparkplugSessionHandler.getNodeBirthMetrics().get(tsKvProto.getKv().getKey()))
.ifPresent(payload -> sendToDeviceRpcRequest(payload, rpcRequest, deviceSessionCtx.getSessionInfo()));
} else {
sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(),
@ -1369,7 +1426,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void onDeviceDeleted(DeviceId deviceId) {
context.onAuthFailure(address);
ChannelHandlerContext ctx = deviceSessionCtx.getChannel();
closeCtx(ctx);
closeCtx(ctx, MqttReasonCodes.Disconnect.ADMINISTRATIVE_ACTION);
}
public void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ThingsboardErrorCode result, String errorMsg) {

View File

@ -20,8 +20,8 @@ import io.netty.handler.codec.mqtt.MqttPublishMessage;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.common.adaptor.AdaptorException;
import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.mqtt.session.MqttDeviceAwareSessionContext;
@ -146,6 +146,12 @@ public class BackwardCompatibilityAdaptor implements MqttTransportAdaptor {
return Optional.empty();
}
@Override
public Optional<MqttMessage> convertToGatewayDeviceDisconnectPublish(MqttDeviceAwareSessionContext ctx, String deviceName, int reasonCode) throws AdaptorException {
log.warn("[{}] invoked not implemented adaptor method! Device name: {} ReasonCode: {}", ctx.getSessionId(), deviceName, reasonCode);
return Optional.empty();
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, byte[] firmwareChunk, String requestId, int chunk, OtaPackageType firmwareType) throws AdaptorException {
return protoAdaptor.convertToPublish(ctx, firmwareChunk, requestId, chunk, firmwareType);

View File

@ -154,6 +154,11 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC, JsonConverter.toJson(provisionResponse)));
}
@Override
public Optional<MqttMessage> convertToGatewayDeviceDisconnectPublish(MqttDeviceAwareSessionContext ctx, String deviceName, int reasonCode) {
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.GATEWAY_DISCONNECT_TOPIC, JsonConverter.toGatewayDeviceDisconnectJson(deviceName, reasonCode)));
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, byte[] firmwareChunk, String requestId, int chunk, OtaPackageType firmwareType) {
return Optional.of(createMqttPublishMsg(ctx, String.format(DEVICE_SOFTWARE_FIRMWARE_RESPONSES_TOPIC_FORMAT, firmwareType.getKeyPrefix(), requestId, chunk), firmwareChunk));

View File

@ -80,6 +80,8 @@ public interface MqttTransportAdaptor {
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, byte[] firmwareChunk, String requestId, int chunk, OtaPackageType firmwareType) throws AdaptorException;
Optional<MqttMessage> convertToGatewayDeviceDisconnectPublish(MqttDeviceAwareSessionContext ctx, String deviceName, int reasonCode) throws AdaptorException;
default MqttPublishMessage createMqttPublishMsg(MqttDeviceAwareSessionContext ctx, String topic, byte[] payloadInBytes) {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.PUBLISH, false, ctx.getQoSForTopic(topic), false, 0);

View File

@ -173,6 +173,15 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC, provisionResponse.toByteArray()));
}
@Override
public Optional<MqttMessage> convertToGatewayDeviceDisconnectPublish(MqttDeviceAwareSessionContext ctx, String deviceName, int reasonCode) {
TransportProtos.GatewayDisconnectDeviceMsg gatewayDeviceDisconnectMsg = TransportProtos.GatewayDisconnectDeviceMsg.newBuilder()
.setDeviceName(deviceName)
.setReasonCode(reasonCode)
.build();
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.GATEWAY_DISCONNECT_TOPIC, gatewayDeviceDisconnectMsg.toByteArray()));
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, byte[] firmwareChunk, String requestId, int chunk, OtaPackageType firmwareType) throws AdaptorException {
return Optional.of(createMqttPublishMsg(ctx, String.format(DEVICE_SOFTWARE_FIRMWARE_RESPONSES_TOPIC_FORMAT, firmwareType.getKeyPrefix(), requestId, chunk), firmwareChunk));

View File

@ -31,6 +31,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttReasonCodes;
import io.netty.handler.codec.mqtt.MqttVersion;
import lombok.Getter;
import lombok.Setter;
@ -47,6 +48,7 @@ import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
@ -60,7 +62,6 @@ import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor;
import org.thingsboard.server.transport.mqtt.util.ReturnCode;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState;
import java.util.ArrayList;
@ -84,7 +85,6 @@ import static org.thingsboard.server.common.transport.service.DefaultTransportSe
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG;
import static org.thingsboard.server.transport.mqtt.util.ReturnCode.PAYLOAD_FORMAT_INVALID;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.STATE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.messageName;
@ -230,7 +230,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
log.trace("[{}][{}][{}] onDeviceConnect: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName);
process(onDeviceConnect(deviceName, deviceType),
result -> {
ack(msg, ReturnCode.SUCCESS);
ack(msg, MqttReasonCodes.PubAck.SUCCESS);
log.trace("[{}][{}][{}] onDeviceConnectOk: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName);
},
t -> logDeviceCreationError(t, deviceName));
@ -361,7 +361,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
void processOnDisconnect(MqttPublishMessage msg, String deviceName) {
deregisterSession(deviceName);
ack(msg, ReturnCode.SUCCESS);
ack(msg, MqttReasonCodes.PubAck.SUCCESS);
}
protected void onDeviceTelemetryJson(int msgId, ByteBuf payload) throws AdaptorException {
@ -618,7 +618,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
process(deviceName, deviceCtx -> processGetAttributeRequestMessage(deviceCtx, requestMsg, deviceName, msgId),
t -> {
failedToProcessLog(deviceName, ATTRIBUTES_REQUEST, t);
ack(msgId, ReturnCode.IMPLEMENTATION_SPECIFIC);
ack(mqttMsg, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR);
});
}
@ -663,20 +663,20 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
return ProtoMqttAdaptor.toBytes(payload);
}
protected void ack(MqttPublishMessage msg, ReturnCode returnCode) {
protected void ack(MqttPublishMessage msg, MqttReasonCodes.PubAck returnCode) {
int msgId = getMsgId(msg);
ack(msgId, returnCode);
}
protected void ack(int msgId, ReturnCode returnCode) {
protected void ack(int msgId, MqttReasonCodes.PubAck returnCode) {
if (msgId > 0) {
writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, returnCode));
writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, returnCode.byteValue()));
}
}
protected void ackOrClose(int msgId) {
if (MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) {
ack(msgId, PAYLOAD_FORMAT_INVALID);
ack(msgId, MqttReasonCodes.PubAck.PAYLOAD_FORMAT_INVALID);
} else {
channel.close();
}
@ -707,13 +707,22 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
public void onSuccess(Void dummy) {
log.trace("[{}][{}][{}][{}] Published msg: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, msg);
if (msgId > 0) {
ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, ReturnCode.SUCCESS));
ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, MqttReasonCodes.PubAck.SUCCESS.byteValue()));
} else {
log.trace("[{}][{}][{}] Wrong msg id: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msg);
ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR.byteValue()));
closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MALFORMED_PACKET);
}
}
@Override
public void onError(Throwable e) {
log.trace("[{}][{}][{}] Failed to publish msg: [{}] for device: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msg, deviceName, e);
if (e instanceof TbRateLimitsException) {
closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MESSAGE_RATE_TOO_HIGH);
} else {
closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.UNSPECIFIED_ERROR);
}
ctx.close();
}
};
@ -737,4 +746,17 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
log.debug("[{}][{}][{}] Failed to process device {} command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msgType, deviceName, t);
}
private void closeDeviceSession(String deviceName, MqttReasonCodes.Disconnect returnCode) {
try {
if (MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) {
MqttTransportAdaptor adaptor = deviceSessionCtx.getPayloadAdaptor();
int returnCodeValue = returnCode.byteValue() & 0xFF;
Optional<MqttMessage> deviceDisconnectPublishMsg = adaptor.convertToGatewayDeviceDisconnectPublish(deviceSessionCtx, deviceName, returnCodeValue);
deviceDisconnectPublishMsg.ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
}
} catch (Exception e) {
log.trace("Failed to send device disconnect to gateway session", e);
}
}
}

View File

@ -1,104 +0,0 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util;
public enum ReturnCode {
SUCCESS((byte) 0x00),
//MQTT 3 codes
UNACCEPTABLE_PROTOCOL_VERSION((byte) 0X01),
IDENTIFIER_REJECTED((byte) 0x02),
SERVER_UNAVAILABLE((byte) 0x03),
BAD_USER_NAME_OR_PASSWORD((byte) 0x04),
NOT_AUTHORIZED((byte) 0x05),
//MQTT 5 codes
NO_MATCHING_SUBSCRIBERS((byte) 0x10),
NO_SUBSCRIPTION_EXISTED((byte) 0x11),
CONTINUE_AUTHENTICATION((byte) 0x18),
REAUTHENTICATE((byte) 0x19),
UNSPECIFIED_ERROR((byte) 0x80),
MALFORMED_PACKET((byte) 0x81),
PROTOCOL_ERROR((byte) 0x82),
IMPLEMENTATION_SPECIFIC((byte) 0x83),
UNSUPPORTED_PROTOCOL_VERSION((byte) 0x84),
CLIENT_IDENTIFIER_NOT_VALID((byte) 0x85),
BAD_USERNAME_OR_PASSWORD((byte) 0x86),
NOT_AUTHORIZED_5((byte) 0x87),
SERVER_UNAVAILABLE_5((byte) 0x88),
SERVER_BUSY((byte) 0x89),
BANNED((byte) 0x8A),
SERVER_SHUTTING_DOWN((byte) 0x8B),
BAD_AUTHENTICATION_METHOD((byte) 0x8C),
KEEP_ALIVE_TIMEOUT((byte) 0x8D),
SESSION_TAKEN_OVER((byte) 0x8E),
TOPIC_FILTER_INVALID((byte) 0x8F),
TOPIC_NAME_INVALID((byte) 0x90),
PACKET_IDENTIFIER_IN_USE((byte) 0x91),
PACKET_IDENTIFIER_NOT_FOUND((byte) 0x92),
RECEIVE_MAXIMUM_EXCEEDED((byte) 0x93),
TOPIC_ALIAS_INVALID((byte) 0x94),
PACKET_TOO_LARGE((byte) 0x95),
MESSAGE_RATE_TOO_HIGH((byte) 0x96),
QUOTA_EXCEEDED((byte) 0x97),
ADMINISTRATIVE_ACTION((byte) 0x98),
PAYLOAD_FORMAT_INVALID((byte) 0x99),
RETAIN_NOT_SUPPORTED((byte) 0x9A),
QOS_NOT_SUPPORTED((byte) 0x9B),
USE_ANOTHER_SERVER((byte) 0x9C),
SERVER_MOVED((byte) 0x9D),
SHARED_SUBSCRIPTION_NOT_SUPPORTED((byte) 0x9E),
CONNECTION_RATE_EXCEEDED((byte) 0x9F),
MAXIMUM_CONNECT_TIME((byte) 0xA0),
SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED((byte) 0xA1),
WILDCARD_SUBSCRIPTION_NOT_SUPPORTED((byte) 0xA2);
private static final ReturnCode[] VALUES;
static {
ReturnCode[] values = values();
VALUES = new ReturnCode[163];
for (ReturnCode code : values) {
final int unsignedByte = code.byteValue & 0xFF;
// Suppress a warning about out of bounds access since the enum contains only correct values
VALUES[unsignedByte] = code; // lgtm [java/index-out-of-bounds]
}
}
private final byte byteValue;
ReturnCode(byte byteValue) {
this.byteValue = byteValue;
}
public byte byteValue() {
return byteValue;
}
public short shortValue(){return byteValue;}
public static ReturnCode valueOf(byte b) {
final int unsignedByte = b & 0xFF;
ReturnCode mqttConnectReturnCode = null;
try {
mqttConnectReturnCode = VALUES[unsignedByte];
} catch (ArrayIndexOutOfBoundsException ignored) {
// no op
}
if (mqttConnectReturnCode == null) {
throw new IllegalArgumentException("unknown connect return code: " + unsignedByte);
}
return mqttConnectReturnCode;
}
}

View File

@ -17,22 +17,24 @@ package org.thingsboard.server.transport.mqtt.util;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttReasonCodes;
import io.netty.handler.codec.mqtt.MqttVersion;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ReturnCodeResolver {
public static MqttConnectReturnCode getConnectionReturnCode(MqttVersion mqttVersion, ReturnCode returnCode) {
if (!MqttVersion.MQTT_5.equals(mqttVersion) && !ReturnCode.SUCCESS.equals(returnCode)) {
public static MqttConnectReturnCode getConnectionReturnCode(MqttVersion mqttVersion, MqttConnectReturnCode returnCode) {
if (!MqttVersion.MQTT_5.equals(mqttVersion) && !MqttConnectReturnCode.CONNECTION_ACCEPTED.equals(returnCode)) {
switch (returnCode) {
case BAD_USERNAME_OR_PASSWORD:
case NOT_AUTHORIZED_5:
case CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD:
case CONNECTION_REFUSED_NOT_AUTHORIZED_5:
return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
case SERVER_UNAVAILABLE_5:
return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;
case CLIENT_IDENTIFIER_NOT_VALID:
case CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID:
return MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
case CONNECTION_REFUSED_SERVER_UNAVAILABLE_5:
case CONNECTION_REFUSED_CONNECTION_RATE_EXCEEDED:
return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;
default:
log.warn("Unknown return code for conversion: {}", returnCode.name());
}
@ -40,18 +42,20 @@ public class ReturnCodeResolver {
return MqttConnectReturnCode.valueOf(returnCode.byteValue());
}
public static int getSubscriptionReturnCode(MqttVersion mqttVersion, ReturnCode returnCode) {
if (!MqttVersion.MQTT_5.equals(mqttVersion) && !ReturnCode.SUCCESS.equals(returnCode)) {
public static int getSubscriptionReturnCode(MqttVersion mqttVersion, MqttReasonCodes.SubAck returnCode) {
if (!MqttVersion.MQTT_5.equals(mqttVersion) && !(MqttReasonCodes.SubAck.GRANTED_QOS_0.equals(returnCode) ||
MqttReasonCodes.SubAck.GRANTED_QOS_1.equals(returnCode) ||
MqttReasonCodes.SubAck.GRANTED_QOS_2.equals(returnCode))) {
switch (returnCode) {
case UNSPECIFIED_ERROR:
case TOPIC_FILTER_INVALID:
case IMPLEMENTATION_SPECIFIC:
case NOT_AUTHORIZED_5:
case IMPLEMENTATION_SPECIFIC_ERROR:
case NOT_AUTHORIZED:
case PACKET_IDENTIFIER_IN_USE:
case QUOTA_EXCEEDED:
case SHARED_SUBSCRIPTION_NOT_SUPPORTED:
case SHARED_SUBSCRIPTIONS_NOT_SUPPORTED:
case SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED:
case WILDCARD_SUBSCRIPTION_NOT_SUPPORTED:
case WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED:
return MqttQoS.FAILURE.value();
}
}

View File

@ -45,6 +45,8 @@ import java.util.Random;
@Listeners(TestListener.class)
public abstract class AbstractContainerTest {
protected static final int TIMEOUT = 30;
protected final static String TEST_PROVISION_DEVICE_KEY = "test_provision_key";
protected final static String TEST_PROVISION_DEVICE_SECRET = "test_provision_secret";
protected static long timeoutMultiplier = 1;

View File

@ -23,7 +23,12 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.JsonObject;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader;
import io.netty.handler.codec.mqtt.MqttReasonCodes;
import io.netty.handler.codec.mqtt.MqttVersion;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
@ -33,6 +38,7 @@ import org.testng.annotations.Test;
import org.thingsboard.common.util.AbstractListeningExecutor;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.mqtt.MqttClient;
import org.thingsboard.mqtt.MqttClientCallback;
import org.thingsboard.mqtt.MqttClientConfig;
import org.thingsboard.mqtt.MqttHandler;
import org.thingsboard.server.common.data.DataConstants;
@ -56,7 +62,9 @@ import org.thingsboard.server.msa.mapper.WsTelemetryResponse;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
@ -65,6 +73,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.fail;
@ -76,6 +85,9 @@ import static org.thingsboard.server.msa.prototypes.DevicePrototypes.defaultDevi
@Slf4j
public class MqttClientTest extends AbstractContainerTest {
private static final String TRANSPORT_HOST = "localhost";
private static final int TRANSPORT_PORT = 1883;
private Device device;
AbstractListeningExecutor handlerExecutor;
@ -100,6 +112,7 @@ public class MqttClientTest extends AbstractContainerTest {
handlerExecutor.destroy();
}
}
@Test
public void telemetryUpload() throws Exception {
DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId());
@ -194,7 +207,7 @@ public class MqttClientTest extends AbstractContainerTest {
String sharedAttributeValue = StringUtils.randomAlphanumeric(8);
sharedAttributes.addProperty("sharedAttr", sharedAttributeValue);
JsonNode sharedAttribute = mapper.readTree(sharedAttributes.toString());
testRestClient.postTelemetryAttribute(DataConstants.DEVICE, device.getId(), SHARED_SCOPE, sharedAttribute);
testRestClient.postTelemetryAttribute(DEVICE, device.getId(), SHARED_SCOPE, sharedAttribute);
// Subscribe to attributes response
mqttClient.on("v1/devices/me/attributes/response/+", listener, MqttQoS.AT_LEAST_ONCE).get();
@ -317,8 +330,8 @@ public class MqttClientTest extends AbstractContainerTest {
mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes())).get();
// Check the response from the server
TimeUnit.SECONDS.sleep(1 * timeoutMultiplier);
MqttEvent responseFromServer = listener.getEvents().poll(1 * timeoutMultiplier, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(3 * timeoutMultiplier);
MqttEvent responseFromServer = listener.getEvents().poll(3 * timeoutMultiplier, TimeUnit.SECONDS);
Integer responseId = Integer.valueOf(Objects.requireNonNull(responseFromServer).getTopic().substring("v1/devices/me/rpc/response/".length()));
assertThat(responseId).isEqualTo(requestId);
assertThat(mapper.readTree(responseFromServer.getMessage()).get("response").asText()).isEqualTo("requestReceived");
@ -444,6 +457,84 @@ public class MqttClientTest extends AbstractContainerTest {
assertThat(provisionResponse.get("status").asText()).isEqualTo("NOT_FOUND");
}
@Test
public void regularDisconnect() throws Exception {
DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId());
MqttMessageListener listener = new MqttMessageListener();
MqttClient mqttClient = getMqttClient(deviceCredentials, listener, MqttVersion.MQTT_5);
final List<Byte> returnCodeByteValue = new ArrayList<>();
MqttClientCallback callbackForDisconnectWithReturnCode = getCallbackWrapperForDisconnectWithReturnCode(returnCodeByteValue);
mqttClient.setCallback(callbackForDisconnectWithReturnCode);
mqttClient.disconnect();
Thread.sleep(1000);
assertThat(returnCodeByteValue.size()).isEqualTo(1);
MqttReasonCodes.Disconnect returnCode = MqttReasonCodes.Disconnect.valueOf(returnCodeByteValue.get(0));
assertThat(returnCode).isEqualTo(MqttReasonCodes.Disconnect.NORMAL_DISCONNECT);
}
@Test
public void clientSessionTakenOverDisconnect() throws Exception {
DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId());
MqttMessageListener listener = new MqttMessageListener();
MqttClient mqttClient = getMqttClient(deviceCredentials, listener, MqttVersion.MQTT_5);
final List<Byte> returnCodeByteValue = new ArrayList<>();
MqttClientCallback callbackForDisconnectWithReturnCode = getCallbackWrapperForDisconnectWithReturnCode(returnCodeByteValue);
mqttClient.setCallback(callbackForDisconnectWithReturnCode);
Thread.sleep(1000);
MqttMessageListener dummyListener = new MqttMessageListener();
MqttClient dummyMqttClient = getMqttClient(deviceCredentials, dummyListener, MqttVersion.MQTT_5);
final List<Byte> returnCodeByteValueSecondClient = new ArrayList<>();
MqttClientCallback callbackForDisconnectWithReturnCodeDummy = getCallbackWrapperForDisconnectWithReturnCode(returnCodeByteValueSecondClient);
dummyMqttClient.setCallback(callbackForDisconnectWithReturnCodeDummy);
Awaitility
.await()
.alias("Check device disconnect.")
.atMost(TIMEOUT*timeoutMultiplier, TimeUnit.SECONDS)
.until(() -> returnCodeByteValue.size() > 0);
assertThat(returnCodeByteValueSecondClient).isEmpty();
assertThat(returnCodeByteValue).isNotEmpty();
MqttReasonCodes.Disconnect returnCode = MqttReasonCodes.Disconnect.valueOf(returnCodeByteValue.get(0));
dummyMqttClient.disconnect();
assertThat(returnCode).isEqualTo(MqttReasonCodes.Disconnect.SESSION_TAKEN_OVER);
}
@Test
public void clientPublishForRegularTopicByProvisionClient() throws Exception {
MqttClient mqttClient = getMqttClient("provision", new MqttMessageListener(), MqttVersion.MQTT_5);
final List<Byte> returnCodeByteValue = new ArrayList<>();
MqttClientCallback callbackForDisconnectWithReturnCode = getCallbackWrapperForDisconnectWithReturnCode(returnCodeByteValue);
mqttClient.setCallback(callbackForDisconnectWithReturnCode);
mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer("test".getBytes()), MqttQoS.AT_LEAST_ONCE).get();
Thread.sleep(1000);
assertThat(returnCodeByteValue).isNotEmpty();
MqttReasonCodes.Disconnect returnCode = MqttReasonCodes.Disconnect.valueOf(returnCodeByteValue.get(0));
assertThat(returnCode).isEqualTo(MqttReasonCodes.Disconnect.TOPIC_NAME_INVALID);
}
@Test
public void clientConnectWithBadCredentials() throws Exception {
MqttClient mqttClient = getMqttClient("unknownAccessToken", new MqttMessageListener(), MqttVersion.MQTT_5, false);
final List<Byte> returnCodeByteValue = new ArrayList<>();
MqttClientCallback callbackForDisconnectWithReturnCode = getCallbackWrapperForDisconnectWithReturnCode(returnCodeByteValue);
mqttClient.setCallback(callbackForDisconnectWithReturnCode);
try {
mqttClient.connect(TRANSPORT_HOST, TRANSPORT_PORT).get(1, TimeUnit.SECONDS);
} catch (TimeoutException ignored) {
}
assertThat(returnCodeByteValue).isNotEmpty();
MqttConnectReturnCode returnCode = MqttConnectReturnCode.valueOf(returnCodeByteValue.get(0));
assertThat(returnCode).isIn(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
}
private RuleChainId createRootRuleChainForRpcResponse() throws Exception {
RuleChain newRuleChain = new RuleChain();
newRuleChain.setName("testRuleChain");
@ -477,8 +568,34 @@ public class MqttClientTest extends AbstractContainerTest {
return defaultRuleChain.get().getId();
}
private MqttClientCallback getCallbackWrapperForDisconnectWithReturnCode(List<Byte> returnCodeByteValueWrapper) {
return new MqttClientCallback() {
@Override
public void connectionLost(Throwable cause) {
}
@Override
public void onSuccessfulReconnect() {
}
@Override
public void onDisconnect(MqttMessage mqttDisconnectMessage) {
log.info("Disconnected with reason: {}", mqttDisconnectMessage);
returnCodeByteValueWrapper.add(((MqttReasonCodeAndPropertiesVariableHeader) mqttDisconnectMessage.variableHeader()).reasonCode());
}
};
}
private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException, ExecutionException {
return getMqttClient(deviceCredentials.getCredentialsId(), listener);
return getMqttClient(deviceCredentials.getCredentialsId(), listener, MqttVersion.MQTT_3_1_1, true);
}
private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener, MqttVersion mqttVersion) throws InterruptedException, ExecutionException {
return getMqttClient(deviceCredentials.getCredentialsId(), listener, mqttVersion, true);
}
private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener, MqttVersion mqttVersion, boolean connect) throws InterruptedException, ExecutionException {
return getMqttClient(deviceCredentials.getCredentialsId(), listener, mqttVersion, connect);
}
private String getOwnerId() {
@ -486,12 +603,23 @@ public class MqttClientTest extends AbstractContainerTest {
}
private MqttClient getMqttClient(String username, MqttMessageListener listener) throws InterruptedException, ExecutionException {
return getMqttClient(username, listener, MqttVersion.MQTT_3_1_1, true);
}
private MqttClient getMqttClient(String username, MqttMessageListener listener, MqttVersion mqttVersion) throws InterruptedException, ExecutionException {
return getMqttClient(username, listener, mqttVersion, true);
}
private MqttClient getMqttClient(String username, MqttMessageListener listener, MqttVersion mqttVersion, boolean connect) throws InterruptedException, ExecutionException {
MqttClientConfig clientConfig = new MqttClientConfig();
clientConfig.setOwnerId(getOwnerId());
clientConfig.setClientId("MQTT client from test");
clientConfig.setUsername(username);
clientConfig.setProtocolVersion(mqttVersion);
MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor);
mqttClient.connect("localhost", 1883).get();
if (connect) {
mqttClient.connect(TRANSPORT_HOST, TRANSPORT_PORT).get();
}
return mqttClient;
}

View File

@ -84,6 +84,9 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
case PUBCOMP:
handlePubcomp(msg);
break;
case DISCONNECT:
handleDisconnect(msg);
break;
}
} else {
log.error("[{}] Message decoding failed: {}", client.getClientConfig().getClientId(), msg.decoderResult().cause().getMessage());
@ -204,6 +207,9 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
// Don't start reconnect logic here
break;
}
if (this.client.getCallback() != null) {
this.client.getCallback().onConnAck(message);
}
}
private void handleSubAck(MqttSubAckMessage message) {
@ -224,6 +230,9 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
if (!pendingSubscription.getFuture().isDone()) {
pendingSubscription.getFuture().setSuccess(null);
}
if (this.client.getCallback() != null) {
this.client.getCallback().onSubAck(message);
}
}
private void handlePublish(Channel channel, MqttPublishMessage message) {
@ -267,6 +276,9 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
this.client.getServerSubscriptions().remove(unsubscription.getTopic());
unsubscription.getFuture().setSuccess(null);
this.client.getPendingServerUnsubscribes().remove(message.variableHeader().messageId());
if (this.client.getCallback() != null) {
this.client.getCallback().onUnsubAck(message);
}
}
private void handlePuback(MqttPubAckMessage message) {
@ -278,6 +290,9 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
pendingPublish.onPubackReceived();
this.client.getPendingPublishes().remove(message.variableHeader().messageId());
pendingPublish.getPayload().release();
if (this.client.getCallback() != null) {
this.client.getCallback().onPubAck(message);
}
}
private void handlePubrec(Channel channel, MqttMessage message) {
@ -301,7 +316,7 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
future = Futures.transform(future, x -> {
this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId());
return null;
}, MoreExecutors.directExecutor());
}, MoreExecutors.directExecutor());
}
future.addListener(() -> {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
@ -319,6 +334,12 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
pendingPublish.onPubcompReceived();
}
private void handleDisconnect(MqttMessage message) {
if (this.client.getCallback() != null) {
this.client.getCallback().onDisconnect(message);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
try {

View File

@ -15,6 +15,12 @@
*/
package org.thingsboard.mqtt;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
/**
* Created by Valerii Sosliuk on 12/30/2017.
*/
@ -32,4 +38,19 @@ public interface MqttClientCallback {
*
*/
void onSuccessfulReconnect();
default void onConnAck(MqttConnAckMessage connAckMessage) {
}
default void onPubAck(MqttPubAckMessage pubAckMessage) {
}
default void onSubAck(MqttSubAckMessage pubAckMessage) {
}
default void onUnsubAck(MqttUnsubAckMessage unsubAckMessage) {
}
default void onDisconnect(MqttMessage mqttDisconnectMessage) {
}
}

View File

@ -45,6 +45,7 @@ import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ListeningExecutor;
@ -87,6 +88,7 @@ final class MqttClientImpl implements MqttClient {
private volatile boolean reconnect = false;
private String host;
private int port;
@Getter
private MqttClientCallback callback;
private final ListeningExecutor handlerExecutor;
@ -426,7 +428,12 @@ final class MqttClientImpl implements MqttClient {
disconnected = true;
if (this.channel != null) {
MqttMessage message = new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0));
this.sendAndFlushPacket(message).addListener(future1 -> channel.close());
ChannelFuture channelFuture = this.sendAndFlushPacket(message);
eventLoop.schedule(() -> {
if (!channelFuture.isDone()) {
this.channel.close();
}
}, 500, TimeUnit.MILLISECONDS);
}
}

View File

@ -143,6 +143,8 @@ transport:
proxy_enabled: "${MQTT_PROXY_PROTOCOL_ENABLED:false}"
# MQTT processing timeout in milliseconds
timeout: "${MQTT_TIMEOUT:10000}"
# MQTT disconnect timeout in milliseconds. The time to wait for the client to disconnect after the server sends a disconnect message.
disconnect_timeout: "${MQTT_DISCONNECT_TIMEOUT:1000}"
msg_queue_size_per_device_limit: "${MQTT_MSG_QUEUE_SIZE_PER_DEVICE_LIMIT:100}" # messages await in the queue before device connected state. This limit works on low level before TenantProfileLimits mechanism
netty:
# Netty leak detector level