Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Nydusd: introduce runtime streaming prefetching #1538

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
31 changes: 16 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 57 additions & 2 deletions rafs/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,10 @@ impl Rafs {
}
if self.fs_prefetch {
// Device should be ready before any prefetch.
self.device.start_prefetch();
self.prefetch(r, prefetch_files);
// self.device.start_prefetch();
// self.prefetch(r, prefetch_files);
self.device.init_stream_prefetch();
self.start_stream_prefetch(r, prefetch_files);
}
self.initialized = true;

Expand Down Expand Up @@ -327,6 +329,7 @@ impl Rafs {
}

impl Rafs {
#[allow(unused)]
fn prefetch(&self, reader: RafsIoReader, prefetch_files: Option<Vec<PathBuf>>) {
let sb = self.sb.clone();
let device = self.device.clone();
Expand All @@ -338,6 +341,18 @@ impl Rafs {
});
}

#[allow(unused)]
fn start_stream_prefetch(&self, reader: RafsIoReader, prefetch_files: Option<Vec<PathBuf>>) {
let sb = self.sb.clone();
let device = self.device.clone();
let prefetch_all = self.prefetch_all;
let root_ino = self.root_ino();

let _ = std::thread::spawn(move || {
Self::do_stream_prefetch(root_ino, reader, prefetch_files, prefetch_all, sb, device);
});
}

/// for blobfs
pub fn fetch_range_synchronous(&self, prefetches: &[BlobPrefetchRequest]) -> Result<()> {
self.device.fetch_range_synchronous(prefetches)
Expand All @@ -347,6 +362,7 @@ impl Rafs {
self.sb.superblock.root_ino()
}

#[allow(unused)]
fn do_prefetch(
root_ino: u64,
mut reader: RafsIoReader,
Expand Down Expand Up @@ -472,6 +488,45 @@ impl Rafs {
}
}

fn do_stream_prefetch(
root_ino: u64,
mut reader: RafsIoReader,
prefetch_files: Option<Vec<PathBuf>>,
_prefetch_all: bool,
sb: Arc<RafsSuper>,
device: BlobDevice,
) {
// Bootstrap has non-empty prefetch table indicating a full prefetch
let inlay_prefetch_all = sb
.is_inlay_prefetch_all(&mut reader)
.map_err(|e| error!("Detect prefetch table error {}", e))
.unwrap_or_default();

// Nydusd has a CLI option indicating a full prefetch
let startup_prefetch_all = prefetch_files
.as_ref()
.map(|f| f.len() == 1 && f[0].as_os_str() == "/")
.unwrap_or(false);

// User specified prefetch files have high priority to be prefetched.
// Moreover, user specified prefetch files list will override those on-disk prefetch table.
if !startup_prefetch_all && !inlay_prefetch_all {
// Then do file based prefetch based on:
// - prefetch listed passed in by user
// - or file prefetch list in metadata
// TODO: change this to iterator
let inodes = prefetch_files.map(|files| Self::convert_file_list(&files, &sb));
let res = sb.stream_prefetch_files(&device, &mut reader, root_ino, inodes);
match res {
Ok(true) => {
info!("Root inode was found, but it should not prefetch all files!")
}
Ok(false) => {}
Err(e) => error!("No file to be prefetched {:?}", e),
}
}
}

fn convert_file_list(files: &[PathBuf], sb: &Arc<RafsSuper>) -> Vec<Inode> {
let mut inodes = Vec::<Inode>::with_capacity(files.len());

Expand Down
1 change: 1 addition & 0 deletions rafs/src/metadata/direct_v6.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,7 @@ impl RafsInode for OndiskInodeWrapper {
curr_chunk_index == tail_chunk_index,
)
.ok_or_else(|| einval!("failed to get chunk information"))?;
//TODO:这里应该考虑某个中间的chunk的blob_index不同的情况
if desc.blob.blob_index() != descs.blob_index() {
vec.push(descs);
descs = BlobIoVec::new(desc.blob.clone());
Expand Down
68 changes: 68 additions & 0 deletions rafs/src/metadata/md_v6.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,74 @@ impl RafsSuper {

Ok(found_root_inode)
}

pub(crate) fn stream_prefetch_data_v6(
&self,
device: &BlobDevice,
rafs_reader: &mut RafsIoReader,
root_ino: Inode,
) -> RafsResult<bool> {
let hint_entries = self.meta.prefetch_table_entries as usize;
if hint_entries == 0 {
return Ok(false);
}

// Try to prefetch according to the list of files specified by the
// builder's `--prefetch-policy fs` option.
let mut prefetch_table = RafsV6PrefetchTable::new();
prefetch_table
.load_prefetch_table_from(rafs_reader, self.meta.prefetch_table_offset, hint_entries)
.map_err(|e| {
error!("Failed in loading hint prefetch table at offset {}", e);
RafsError::Prefetch(format!(
"Failed in loading hint prefetch table at offset {}. {:?}",
self.meta.prefetch_table_offset, e
))
})?;
// debug!("prefetch table contents {:?}", prefetch_table);

let mut hardlinks: HashSet<u64> = HashSet::new();
let mut fetched_ranges: HashMap<u32, HashSet<u32>> = HashMap::new();
let blob_ccis = device.get_all_blob_cci();

let mut found_root_inode = false;
for ino in prefetch_table.inodes {
// Inode number 0 is invalid, it was added because prefetch table has to be aligned.
if ino == 0 {
break;
}
if ino as Inode == root_ino {
found_root_inode = true;
}
// debug!("CMDebug: hint prefetch inode {}", ino);

let ranges = self
.get_inode_ranges(
ino as u64,
&mut hardlinks,
&mut fetched_ranges,
device,
&blob_ccis,
)
.map_err(|e| {
RafsError::Prefetch(format!("Failed in get inode chunk ranges. {:?}", e))
})?;

// debug!("CMDebug: prefetch inode: {}, ranges: {:?}", ino, ranges);

for r in ranges {
device.add_stream_prefetch_range(r).map_err(|e| {
RafsError::Prefetch(format!("Failed to add inode prefetch range. {:?}", e))
})?;
}
}

device.flush_stream_prefetch().map_err(|e| {
RafsError::Prefetch(format!("Failed to flush inode prefetch range. {:?}", e))
})?;

Ok(found_root_inode)
}
}

#[cfg(test)]
Expand Down
Loading
Loading