fixed missing ACK when message publishing fails

This commit is contained in:
dashevchenko 2025-09-29 11:44:27 +03:00
parent 81ce59db1e
commit f9b52c3a43

View File

@ -412,7 +412,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
}
String deviceName = deviceEntry.getKey();
process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId),
t -> failedToProcessLog(deviceName, TELEMETRY, t));
t -> processFailure(msgId, deviceName, TELEMETRY, t));
}
}
@ -444,7 +444,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
deviceMsgList.forEach(telemetryMsg -> {
String deviceName = checkDeviceName(telemetryMsg.getDeviceName());
process(deviceName, deviceCtx -> processPostTelemetryMsg(deviceCtx, telemetryMsg.getMsg(), deviceName, msgId),
t -> failedToProcessLog(deviceName, TELEMETRY, t));
t -> processFailure(msgId, deviceName, TELEMETRY, t));
});
} catch (RuntimeException | InvalidProtocolBufferException e) {
throw new AdaptorException(e);
@ -483,7 +483,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
String deviceName = deviceEntry.getKey();
process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId),
t -> failedToProcessLog(deviceName, CLAIMING, t));
t -> processFailure(msgId, deviceName, CLAIMING, t));
}
}
@ -510,7 +510,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
claimMsgList.forEach(claimDeviceMsg -> {
String deviceName = checkDeviceName(claimDeviceMsg.getDeviceName());
process(deviceName, deviceCtx -> processClaimDeviceMsg(deviceCtx, claimDeviceMsg.getClaimRequest(), deviceName, msgId),
t -> failedToProcessLog(deviceName, CLAIMING, t));
t -> processFailure(msgId, deviceName, CLAIMING, t));
});
} catch (RuntimeException | InvalidProtocolBufferException e) {
throw new AdaptorException(e);
@ -539,7 +539,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
String deviceName = deviceEntry.getKey();
process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, deviceEntry.getValue(), deviceName, msgId),
t -> failedToProcessLog(deviceName, ATTRIBUTE, t));
t -> processFailure(msgId, deviceName, ATTRIBUTE, t));
}
}
@ -565,7 +565,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
attributesMsgList.forEach(attributesMsg -> {
String deviceName = checkDeviceName(attributesMsg.getDeviceName());
process(deviceName, deviceCtx -> processPostAttributesMsg(deviceCtx, attributesMsg.getMsg(), deviceName, msgId),
t -> failedToProcessLog(deviceName, ATTRIBUTE, t));
t -> processFailure(msgId, deviceName, ATTRIBUTE, t));
});
} catch (RuntimeException | InvalidProtocolBufferException e) {
throw new AdaptorException(e);
@ -648,7 +648,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
private void onDeviceRpcResponse(Integer requestId, String data, String deviceName, int msgId) {
process(deviceName, deviceCtx -> processRpcResponseMsg(deviceCtx, requestId, data, deviceName, msgId),
t -> failedToProcessLog(deviceName, RPC_RESPONSE, t));
t -> processFailure(msgId, deviceName, RPC_RESPONSE, t));
}
private void processRpcResponseMsg(MqttDeviceAwareSessionContext deviceCtx, Integer requestId, String data, String deviceName, int msgId) {
@ -661,8 +661,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
int msgId = getMsgId(mqttMsg);
process(deviceName, deviceCtx -> processGetAttributeRequestMessage(deviceCtx, requestMsg, deviceName, msgId),
t -> {
failedToProcessLog(deviceName, ATTRIBUTES_REQUEST, t);
ack(mqttMsg, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR);
processFailure(msgId, deviceName, ATTRIBUTES_REQUEST, t, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR);
});
}
@ -790,10 +789,19 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
}
}
protected void failedToProcessLog(String deviceName, String msgType, Throwable t) {
protected void processFailure(int msgId, String deviceName, String msgType, Throwable t) {
log.debug("[{}][{}][{}] Failed to process device {} command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msgType, deviceName, t);
if (DataConstants.MAXIMUM_NUMBER_OF_DEVICES_REACHED.equals(t.getMessage())) {
processFailure(msgId, deviceName, msgType, t, MqttReasonCodes.PubAck.QUOTA_EXCEEDED);
} else {
processFailure(msgId, deviceName, msgType, t, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR);
}
}
protected void processFailure(int msgId, String deviceName, String msgType, Throwable t, MqttReasonCodes.PubAck pubAck) {
log.debug("[{}][{}][{}] Failed to process device {} command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msgType, deviceName, t);
ack(msgId, pubAck);
}
private void closeDeviceSession(String deviceName, MqttReasonCodes.Disconnect returnCode) {
try {