diff --git a/Cargo.lock b/Cargo.lock index 0f56b0b..c326e56 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -95,6 +95,17 @@ dependencies = [ "zstd-safe", ] +[[package]] +name = "async-recursion" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cda8f4bcc10624c4e85bc66b3f452cca98cfa5ca002dc83a16aad2367641bea" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-stream" version = "0.3.3" @@ -1331,6 +1342,7 @@ version = "0.9.7-alpha.0" dependencies = [ "anyhow", "async-compression", + "async-recursion", "async-stream", "async_zip", "bincode", diff --git a/Cargo.toml b/Cargo.toml index 937256c..0859993 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ tokio-util = {version = "0.7.4", features = ["io", "full"]} tree_magic = {package = "tree_magic_mini", version = "3.0.0"} [dev-dependencies] +async-recursion = "1.0.0" ctor = "0.1.20" pretty_assertions = "1.3.0" tokio-test = "0.4.2" diff --git a/src/adapters/zip.rs b/src/adapters/zip.rs index f386160..0cf0430 100644 --- a/src/adapters/zip.rs +++ b/src/adapters/zip.rs @@ -1,17 +1,10 @@ -use std::{any::Any, io::Cursor}; - use super::*; -use crate::{adapted_iter::AdaptedFilesIter, print_bytes}; +use crate::print_bytes; use anyhow::*; use async_stream::stream; use async_zip::read::stream::ZipFileReader; use lazy_static::lazy_static; use log::*; -use tokio::{ - io::AsyncReadExt, - sync::mpsc::{self, Sender}, -}; -use tokio_stream::wrappers::ReceiverStream; static EXTENSIONS: &[&str] = &["zip"]; @@ -44,65 +37,61 @@ impl GetMetadata for ZipAdapter { } } -async fn yielder(ai: AdaptInfo, s: Sender>) -> Result<()> { - let AdaptInfo { - inp, - filepath_hint, - archive_recursion_depth, - postprocess, - line_prefix, - config, - .. - } = ai; - let mut zip = ZipFileReader::new(inp); - - while !zip.finished() { - if let Some(mut reader) = zip.entry_reader().await? { - let file = reader.entry(); - if file.filename().ends_with("/") { - continue; - } - debug!( - "{}{}|{}: {} ({} packed)", - line_prefix, - filepath_hint.display(), - file.filename(), - print_bytes(file.uncompressed_size() as f64), - print_bytes(file.compressed_size() as f64) - ); - let new_line_prefix = format!("{}{}: ", line_prefix, file.filename()); - let fname = PathBuf::from(file.filename()); - drop(file); - tokio::pin!(reader); - // SAFETY: this should be solvable without unsafe but idk how :( - let reader2 = unsafe { - std::intrinsics::transmute::< - Pin<&mut (dyn AsyncRead + Send)>, - Pin<&'static mut (dyn AsyncRead + Send)>, - >(reader) - }; - s.send(Ok(AdaptInfo { - filepath_hint: fname, - is_real_file: false, - inp: Box::pin(reader2), - line_prefix: new_line_prefix, - archive_recursion_depth: archive_recursion_depth + 1, - postprocess, - config: config.clone(), - })) - .await - .map_err(|_| anyhow!("could not send adaptinfo"))?; - } - } - - Ok(()) -} - impl FileAdapter for ZipAdapter { fn adapt(&self, ai: AdaptInfo, _detection_reason: &FileMatcher) -> Result { - let (s, r) = mpsc::channel(1); - tokio::spawn(yielder(ai, s)); - Ok(Box::pin(ReceiverStream::new(r))) + // let (s, r) = mpsc::channel(1); + let AdaptInfo { + inp, + filepath_hint, + archive_recursion_depth, + postprocess, + line_prefix, + config, + .. + } = ai; + let mut zip = ZipFileReader::new(inp); + + let s = stream! { + while !zip.finished() { + if let Some(reader) = zip.entry_reader().await? { + let file = reader.entry(); + if file.filename().ends_with("/") { + continue; + } + debug!( + "{}{}|{}: {} ({} packed)", + line_prefix, + filepath_hint.display(), + file.filename(), + print_bytes(file.uncompressed_size() as f64), + print_bytes(file.compressed_size() as f64) + ); + let new_line_prefix = format!("{}{}: ", line_prefix, file.filename()); + let fname = PathBuf::from(file.filename()); + tokio::pin!(reader); + // SAFETY: this should be solvable without unsafe but idk how :( + // the issue is that ZipEntryReader borrows from ZipFileReader, but we need to yield it here into the stream + // but then it can't borrow from the ZipFile + let reader2 = unsafe { + std::intrinsics::transmute::< + Pin<&mut (dyn AsyncRead + Send)>, + Pin<&'static mut (dyn AsyncRead + Send)>, + >(reader) + }; + yield Ok(AdaptInfo { + filepath_hint: fname, + is_real_file: false, + inp: Box::pin(reader2), + line_prefix: new_line_prefix, + archive_recursion_depth: archive_recursion_depth + 1, + postprocess, + config: config.clone(), + }); + } + } + }; + + Ok(Box::pin(s)) } } @@ -145,44 +134,53 @@ impl<'a> AdaptedFilesIter for ZipAdaptIter<'a> { #[cfg(test)] mod test { + use async_zip::{write::ZipFileWriter, Compression, ZipEntryBuilder}; + use tokio::fs::File; + use super::*; - use crate::test_utils::*; + use crate::{preproc::loop_adapt, test_utils::*}; + use pretty_assertions::assert_eq; - fn create_zip(fname: &str, content: &str, add_inner: bool) -> Result> { - use ::zip::write::FileOptions; - use std::io::Write; + #[async_recursion::async_recursion] + async fn create_zip(fname: &str, content: &str, add_inner: bool) -> Result> { + let v = Vec::new(); + let mut cursor = std::io::Cursor::new(v); + let mut zip = ZipFileWriter::new(&mut cursor); - // We use a buffer here, though you'd normally use a `File` - let mut zip = ::zip::ZipWriter::new(std::io::Cursor::new(Vec::new())); - - let options = FileOptions::default().compression_method(::zip::CompressionMethod::Stored); - zip.start_file(fname, options)?; - zip.write(content.as_bytes())?; + let options = ZipEntryBuilder::new(fname.to_string(), Compression::Stored); + zip.write_entry_whole(options, content.as_bytes()).await?; if add_inner { - zip.start_file("inner.zip", options)?; - zip.write(&create_zip("inner.txt", "inner text file", false)?)?; + let opts = ZipEntryBuilder::new("inner.zip".to_string(), Compression::Stored); + zip.write_entry_whole( + opts, + &create_zip("inner.txt", "inner text file", false).await?, + ) + .await?; } - // Apply the changes you've made. - // Dropping the `ZipWriter` will have the same effect, but may silently fail - Ok(zip.finish()?.into_inner()) + zip.close().await?; + Ok(cursor.into_inner()) } - #[test] - fn only_seek_zip() -> Result<()> { + #[tokio::test] + async fn only_seek_zip() -> Result<()> { let zip = test_data_dir().join("only-seek-zip.zip"); + let (a, d) = simple_adapt_info(&zip, Box::pin(File::open(&zip).await?)); + let v = adapted_to_vec(loop_adapt(&ZipAdapter::new(), d, a)?).await?; + assert_eq!(String::from_utf8(v)?, ""); + Ok(()) } - #[test] - fn recurse() -> Result<()> { - let zipfile = create_zip("outer.txt", "outer text file", true)?; - let adapter: Box = Box::new(ZipAdapter::new()); + #[tokio::test] + async fn recurse() -> Result<()> { + let zipfile = create_zip("outer.txt", "outer text file", true).await?; + let adapter = ZipAdapter::new(); let (a, d) = simple_adapt_info( &PathBuf::from("outer.zip"), - Box::new(std::io::Cursor::new(zipfile)), + Box::pin(std::io::Cursor::new(zipfile)), ); - let buf = adapted_to_vec(adapter.adapt(a, &d)?)?; + let buf = adapted_to_vec(loop_adapt(&adapter, d, a)?).await?; assert_eq!( String::from_utf8(buf)?,