fix zip reader kinda

This commit is contained in:
phiresky 2023-01-06 17:07:14 +05:30
parent 2b4caf067b
commit 94a037fcca
3 changed files with 99 additions and 88 deletions

12
Cargo.lock generated
View File

@ -95,6 +95,17 @@ dependencies = [
"zstd-safe", "zstd-safe",
] ]
[[package]]
name = "async-recursion"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cda8f4bcc10624c4e85bc66b3f452cca98cfa5ca002dc83a16aad2367641bea"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "async-stream" name = "async-stream"
version = "0.3.3" version = "0.3.3"
@ -1331,6 +1342,7 @@ version = "0.9.7-alpha.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-compression", "async-compression",
"async-recursion",
"async-stream", "async-stream",
"async_zip", "async_zip",
"bincode", "bincode",

View File

@ -56,6 +56,7 @@ tokio-util = {version = "0.7.4", features = ["io", "full"]}
tree_magic = {package = "tree_magic_mini", version = "3.0.0"} tree_magic = {package = "tree_magic_mini", version = "3.0.0"}
[dev-dependencies] [dev-dependencies]
async-recursion = "1.0.0"
ctor = "0.1.20" ctor = "0.1.20"
pretty_assertions = "1.3.0" pretty_assertions = "1.3.0"
tokio-test = "0.4.2" tokio-test = "0.4.2"

View File

@ -1,17 +1,10 @@
use std::{any::Any, io::Cursor};
use super::*; use super::*;
use crate::{adapted_iter::AdaptedFilesIter, print_bytes}; use crate::print_bytes;
use anyhow::*; use anyhow::*;
use async_stream::stream; use async_stream::stream;
use async_zip::read::stream::ZipFileReader; use async_zip::read::stream::ZipFileReader;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use log::*; use log::*;
use tokio::{
io::AsyncReadExt,
sync::mpsc::{self, Sender},
};
use tokio_stream::wrappers::ReceiverStream;
static EXTENSIONS: &[&str] = &["zip"]; static EXTENSIONS: &[&str] = &["zip"];
@ -44,65 +37,61 @@ impl GetMetadata for ZipAdapter {
} }
} }
async fn yielder(ai: AdaptInfo, s: Sender<Result<AdaptInfo>>) -> Result<()> {
let AdaptInfo {
inp,
filepath_hint,
archive_recursion_depth,
postprocess,
line_prefix,
config,
..
} = ai;
let mut zip = ZipFileReader::new(inp);
while !zip.finished() {
if let Some(mut reader) = zip.entry_reader().await? {
let file = reader.entry();
if file.filename().ends_with("/") {
continue;
}
debug!(
"{}{}|{}: {} ({} packed)",
line_prefix,
filepath_hint.display(),
file.filename(),
print_bytes(file.uncompressed_size() as f64),
print_bytes(file.compressed_size() as f64)
);
let new_line_prefix = format!("{}{}: ", line_prefix, file.filename());
let fname = PathBuf::from(file.filename());
drop(file);
tokio::pin!(reader);
// SAFETY: this should be solvable without unsafe but idk how :(
let reader2 = unsafe {
std::intrinsics::transmute::<
Pin<&mut (dyn AsyncRead + Send)>,
Pin<&'static mut (dyn AsyncRead + Send)>,
>(reader)
};
s.send(Ok(AdaptInfo {
filepath_hint: fname,
is_real_file: false,
inp: Box::pin(reader2),
line_prefix: new_line_prefix,
archive_recursion_depth: archive_recursion_depth + 1,
postprocess,
config: config.clone(),
}))
.await
.map_err(|_| anyhow!("could not send adaptinfo"))?;
}
}
Ok(())
}
impl FileAdapter for ZipAdapter { impl FileAdapter for ZipAdapter {
fn adapt(&self, ai: AdaptInfo, _detection_reason: &FileMatcher) -> Result<AdaptedFilesIterBox> { fn adapt(&self, ai: AdaptInfo, _detection_reason: &FileMatcher) -> Result<AdaptedFilesIterBox> {
let (s, r) = mpsc::channel(1); // let (s, r) = mpsc::channel(1);
tokio::spawn(yielder(ai, s)); let AdaptInfo {
Ok(Box::pin(ReceiverStream::new(r))) inp,
filepath_hint,
archive_recursion_depth,
postprocess,
line_prefix,
config,
..
} = ai;
let mut zip = ZipFileReader::new(inp);
let s = stream! {
while !zip.finished() {
if let Some(reader) = zip.entry_reader().await? {
let file = reader.entry();
if file.filename().ends_with("/") {
continue;
}
debug!(
"{}{}|{}: {} ({} packed)",
line_prefix,
filepath_hint.display(),
file.filename(),
print_bytes(file.uncompressed_size() as f64),
print_bytes(file.compressed_size() as f64)
);
let new_line_prefix = format!("{}{}: ", line_prefix, file.filename());
let fname = PathBuf::from(file.filename());
tokio::pin!(reader);
// SAFETY: this should be solvable without unsafe but idk how :(
// the issue is that ZipEntryReader borrows from ZipFileReader, but we need to yield it here into the stream
// but then it can't borrow from the ZipFile
let reader2 = unsafe {
std::intrinsics::transmute::<
Pin<&mut (dyn AsyncRead + Send)>,
Pin<&'static mut (dyn AsyncRead + Send)>,
>(reader)
};
yield Ok(AdaptInfo {
filepath_hint: fname,
is_real_file: false,
inp: Box::pin(reader2),
line_prefix: new_line_prefix,
archive_recursion_depth: archive_recursion_depth + 1,
postprocess,
config: config.clone(),
});
}
}
};
Ok(Box::pin(s))
} }
} }
@ -145,44 +134,53 @@ impl<'a> AdaptedFilesIter for ZipAdaptIter<'a> {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use async_zip::{write::ZipFileWriter, Compression, ZipEntryBuilder};
use tokio::fs::File;
use super::*; use super::*;
use crate::test_utils::*; use crate::{preproc::loop_adapt, test_utils::*};
use pretty_assertions::assert_eq;
fn create_zip(fname: &str, content: &str, add_inner: bool) -> Result<Vec<u8>> { #[async_recursion::async_recursion]
use ::zip::write::FileOptions; async fn create_zip(fname: &str, content: &str, add_inner: bool) -> Result<Vec<u8>> {
use std::io::Write; let v = Vec::new();
let mut cursor = std::io::Cursor::new(v);
let mut zip = ZipFileWriter::new(&mut cursor);
// We use a buffer here, though you'd normally use a `File` let options = ZipEntryBuilder::new(fname.to_string(), Compression::Stored);
let mut zip = ::zip::ZipWriter::new(std::io::Cursor::new(Vec::new())); zip.write_entry_whole(options, content.as_bytes()).await?;
let options = FileOptions::default().compression_method(::zip::CompressionMethod::Stored);
zip.start_file(fname, options)?;
zip.write(content.as_bytes())?;
if add_inner { if add_inner {
zip.start_file("inner.zip", options)?; let opts = ZipEntryBuilder::new("inner.zip".to_string(), Compression::Stored);
zip.write(&create_zip("inner.txt", "inner text file", false)?)?; zip.write_entry_whole(
opts,
&create_zip("inner.txt", "inner text file", false).await?,
)
.await?;
} }
// Apply the changes you've made. zip.close().await?;
// Dropping the `ZipWriter` will have the same effect, but may silently fail Ok(cursor.into_inner())
Ok(zip.finish()?.into_inner())
} }
#[test] #[tokio::test]
fn only_seek_zip() -> Result<()> { async fn only_seek_zip() -> Result<()> {
let zip = test_data_dir().join("only-seek-zip.zip"); let zip = test_data_dir().join("only-seek-zip.zip");
let (a, d) = simple_adapt_info(&zip, Box::pin(File::open(&zip).await?));
let v = adapted_to_vec(loop_adapt(&ZipAdapter::new(), d, a)?).await?;
assert_eq!(String::from_utf8(v)?, "");
Ok(()) Ok(())
} }
#[test] #[tokio::test]
fn recurse() -> Result<()> { async fn recurse() -> Result<()> {
let zipfile = create_zip("outer.txt", "outer text file", true)?; let zipfile = create_zip("outer.txt", "outer text file", true).await?;
let adapter: Box<dyn FileAdapter> = Box::new(ZipAdapter::new()); let adapter = ZipAdapter::new();
let (a, d) = simple_adapt_info( let (a, d) = simple_adapt_info(
&PathBuf::from("outer.zip"), &PathBuf::from("outer.zip"),
Box::new(std::io::Cursor::new(zipfile)), Box::pin(std::io::Cursor::new(zipfile)),
); );
let buf = adapted_to_vec(adapter.adapt(a, &d)?)?; let buf = adapted_to_vec(loop_adapt(&adapter, d, a)?).await?;
assert_eq!( assert_eq!(
String::from_utf8(buf)?, String::from_utf8(buf)?,