From c4dbabaf102844b8b42dfb4f8dae8e2c1e849f4f Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 26 Dec 2022 18:58:17 +0100 Subject: [PATCH] reimplement tar adapter async --- Cargo.lock | 27 +++++----- Cargo.toml | 2 +- src/adapted_iter.rs | 8 ++- src/adapters.rs | 5 +- src/adapters/custom.rs | 5 +- src/adapters/decompress.rs | 6 ++- src/adapters/postproc.rs | 5 +- src/adapters/spawning.rs | 1 - src/adapters/tar.rs | 105 ++++++++++++++++++++++++------------- src/adapters/writing.rs | 50 ++++++------------ src/preproc.rs | 4 +- src/recurse.rs | 3 +- 12 files changed, 123 insertions(+), 98 deletions(-) delete mode 100644 src/adapters/spawning.rs diff --git a/Cargo.lock b/Cargo.lock index 18a5c9b..54fe77b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1326,10 +1326,10 @@ dependencies = [ "serde_json", "size_format", "structopt", - "tar", "tempfile", "tokio 1.23.0", "tokio-stream", + "tokio-tar", "tokio-test", "tokio-util", "tree_magic_mini", @@ -1600,17 +1600,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "tar" -version = "0.4.38" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b55807c0344e1e6c04d7c965f5289c39a8d94ae23ed5c0b57aabac549f871c6" -dependencies = [ - "filetime", - "libc", - "xattr", -] - [[package]] name = "tempfile" version = "3.3.0" @@ -1778,6 +1767,20 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-tar" +version = "0.3.0" +source = "git+https://github.com/vorot93/tokio-tar#1bd30fbd1a219e8982571da48eb68f34317d1e15" +dependencies = [ + "filetime", + "futures-core", + "libc", + "redox_syscall", + "tokio 1.23.0", + "tokio-stream", + "xattr", +] + [[package]] name = "tokio-test" version = "0.4.2" diff --git a/Cargo.toml b/Cargo.toml index 101f7f3..2b6c1db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,10 +46,10 @@ serde = {version = "1.0.115", features = ["derive"]} serde_json = "1.0.57" size_format = "1.0.2" structopt = "0.3.17" -tar = "0.4.30" tempfile = "3.1.0" tokio = {version = "1.21.2", features = ["full"]} tokio-stream = {version = "0.1.11", features = ["io-util", "tokio-util"]} +tokio-tar = { git = "https://github.com/vorot93/tokio-tar", version = "0.3.0" } tokio-util = {version = "0.7.4", features = ["io", "full"]} tree_magic = {package = "tree_magic_mini", version = "3.0.0"} zip = "0.6.3" diff --git a/src/adapted_iter.rs b/src/adapted_iter.rs index 5fa209d..3fc1504 100644 --- a/src/adapted_iter.rs +++ b/src/adapted_iter.rs @@ -4,7 +4,11 @@ use tokio_stream::Stream; use crate::adapters::AdaptInfo; -pub trait AdaptedFilesIter: Stream + Send {} -impl AdaptedFilesIter for T where T: Stream + Send {} +pub trait AdaptedFilesIter: Stream> + Send {} +impl AdaptedFilesIter for T where T: Stream> + Send {} pub type AdaptedFilesIterBox = Pin>; + +pub fn one_file(ai: AdaptInfo) -> AdaptedFilesIterBox { + Box::pin(tokio_stream::once(Ok(ai))) +} diff --git a/src/adapters.rs b/src/adapters.rs index ab29664..09627d9 100644 --- a/src/adapters.rs +++ b/src/adapters.rs @@ -3,10 +3,9 @@ pub mod decompress; // pub mod ffmpeg; pub mod postproc; // pub mod pdfpages; -pub mod spawning; use std::sync::Arc; // pub mod sqlite; -// pub mod tar; +pub mod tar; // pub mod tesseract; // pub mod writing; // pub mod zip; @@ -118,7 +117,7 @@ pub fn get_all_adapters(custom_adapters: Option>) -> Ad //Rc::new(ffmpeg::FFmpegAdapter::new()), // Rc::new(zip::ZipAdapter::new()), Arc::new(decompress::DecompressAdapter::new()), - // Rc::new(tar::TarAdapter::new()), + Arc::new(tar::TarAdapter::new()), //Rc::new(sqlite::SqliteAdapter::new()), // Rc::new(pdfpages::PdfPagesAdapter::new()), // Rc::new(tesseract::TesseractAdapter::new()), diff --git a/src/adapters/custom.rs b/src/adapters/custom.rs index 13f519c..73dfc98 100644 --- a/src/adapters/custom.rs +++ b/src/adapters/custom.rs @@ -1,5 +1,6 @@ use super::*; use super::{AdaptInfo, AdapterMeta, FileAdapter, GetMetadata}; +use crate::adapted_iter::one_file; use crate::{ adapted_iter::AdaptedFilesIterBox, expand::expand_str_ez, @@ -234,7 +235,7 @@ impl FileAdapter for CustomSpawningFileAdapter { .with_context(|| format!("Could not set cmd arguments for {}", self.binary))?; debug!("executing {:?}", cmd); let output = pipe_output(&line_prefix, cmd, inp, &self.binary, "")?; - Ok(Box::pin(tokio_stream::once(AdaptInfo { + Ok(one_file(AdaptInfo { filepath_hint: PathBuf::from(expand_str_ez( self.output_path_hint .as_deref() @@ -250,7 +251,7 @@ impl FileAdapter for CustomSpawningFileAdapter { archive_recursion_depth: archive_recursion_depth + 1, postprocess, config, - }))) + })) } } impl CustomAdapterConfig { diff --git a/src/adapters/decompress.rs b/src/adapters/decompress.rs index 0ba70fa..94cab7c 100644 --- a/src/adapters/decompress.rs +++ b/src/adapters/decompress.rs @@ -1,3 +1,5 @@ +use crate::adapted_iter::one_file; + use super::*; use anyhow::Result; @@ -93,7 +95,7 @@ fn get_inner_filename(filename: &Path) -> PathBuf { impl FileAdapter for DecompressAdapter { fn adapt(&self, ai: AdaptInfo, detection_reason: &FileMatcher) -> Result { - Ok(Box::pin(tokio_stream::once(AdaptInfo { + Ok(one_file(AdaptInfo { filepath_hint: get_inner_filename(&ai.filepath_hint), is_real_file: false, archive_recursion_depth: ai.archive_recursion_depth + 1, @@ -101,7 +103,7 @@ impl FileAdapter for DecompressAdapter { line_prefix: ai.line_prefix, config: ai.config.clone(), postprocess: ai.postprocess, - }))) + })) } } diff --git a/src/adapters/postproc.rs b/src/adapters/postproc.rs index 844a435..9fd6841 100644 --- a/src/adapters/postproc.rs +++ b/src/adapters/postproc.rs @@ -13,6 +13,7 @@ use tokio::io::{AsyncRead, AsyncReadExt}; use tokio_util::io::ReaderStream; use tokio_util::io::StreamReader; +use crate::adapted_iter::one_file; use crate::adapted_iter::AdaptedFilesIterBox; use crate::matching::FastFileMatcher; @@ -56,7 +57,7 @@ impl FileAdapter for PostprocPrefix { postprocess: false, ..a }; - Ok(Box::pin(tokio_stream::once(ai))) + Ok(one_file(ai)) } } @@ -183,7 +184,7 @@ impl FileAdapter for PostprocPageBreaks { .join(a.filepath_hint.file_stem().unwrap_or_default()), ..a }; - Ok(Box::pin(tokio_stream::once(ai))) + Ok(one_file(ai)) } } /// Adds the prefix "Page N:" to each line, diff --git a/src/adapters/spawning.rs b/src/adapters/spawning.rs deleted file mode 100644 index 8b13789..0000000 --- a/src/adapters/spawning.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/adapters/tar.rs b/src/adapters/tar.rs index 0f08cfd..5b6c054 100644 --- a/src/adapters/tar.rs +++ b/src/adapters/tar.rs @@ -1,11 +1,20 @@ -use super::*; -use crate::{preproc::rga_preproc, print_bytes}; -use ::tar::EntryType::Regular; +use crate::{ + adapted_iter::AdaptedFilesIterBox, + adapters::AdapterMeta, + matching::{FastFileMatcher, FileMatcher}, + preproc::rga_preproc, + print_bytes, +}; use anyhow::*; +use async_stream::stream; use lazy_static::lazy_static; use log::*; use std::path::PathBuf; -use writing::{WritingFileAdapter, WritingFileAdapterTrait}; +use tokio::io::AsyncWrite; +use tokio_stream::StreamExt; +use tokio_util::io::StreamReader; + +use super::{AdaptInfo, FileAdapter, GetMetadata}; static EXTENSIONS: &[&str] = &["tar"]; @@ -28,8 +37,8 @@ lazy_static! { pub struct TarAdapter; impl TarAdapter { - pub fn new() -> WritingFileAdapter { - WritingFileAdapter::new(Box::new(TarAdapter)) + pub fn new() -> TarAdapter { + TarAdapter } } impl GetMetadata for TarAdapter { @@ -38,45 +47,67 @@ impl GetMetadata for TarAdapter { } } -impl WritingFileAdapterTrait for TarAdapter { - fn adapt_write( - &self, - ai: AdaptInfo, - _detection_reason: &FileMatcher, - oup: &mut dyn Write, - ) -> Result<()> { +impl FileAdapter for TarAdapter { + fn adapt(&self, ai: AdaptInfo, _detection_reason: &FileMatcher) -> Result { let AdaptInfo { filepath_hint, - mut inp, + inp, line_prefix, archive_recursion_depth, config, + postprocess, .. } = ai; - let mut archive = ::tar::Archive::new(&mut inp); - for entry in archive.entries()? { - let mut file = entry?; - if Regular == file.header().entry_type() { - let path = PathBuf::from(file.path()?.to_owned()); - debug!( - "{}|{}: {}", - filepath_hint.display(), - path.display(), - print_bytes(file.header().size()? as f64), - ); - let line_prefix = &format!("{}{}: ", line_prefix, path.display()); - let ai2: AdaptInfo = AdaptInfo { - filepath_hint: path, - is_real_file: false, - archive_recursion_depth: archive_recursion_depth + 1, - inp: Box::new(file), - oup, - line_prefix, - config: config.clone(), - }; - rga_preproc(ai2)?; + let mut archive = ::tokio_tar::Archive::new(inp); + + let mut entries = archive.entries()?; + let s = stream! { + while let Some(entry) = entries.next().await { + let mut file = entry?; + if tokio_tar::EntryType::Regular == file.header().entry_type() { + let path = PathBuf::from(file.path()?.to_owned()); + debug!( + "{}|{}: {}", + filepath_hint.display(), + path.display(), + print_bytes(file.header().size().unwrap_or(0) as f64), + ); + let line_prefix = &format!("{}{}: ", line_prefix, path.display()); + let ai2: AdaptInfo = AdaptInfo { + filepath_hint: path, + is_real_file: false, + archive_recursion_depth: archive_recursion_depth + 1, + inp: Box::pin(file), + line_prefix: line_prefix.to_string(), + config: config.clone(), + postprocess, + }; + yield Ok(ai2); + } } - } + }; + + Ok(Box::pin(s)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::*; + use pretty_assertions::assert_eq; + use tokio::fs::File; + + #[tokio::test] + async fn test_simple_tar() -> Result<()> { + let filepath = test_data_dir().join("test.tar"); + + let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?)); + + let adapter = TarAdapter::new(); + let r = adapter.adapt(a, &d)?; + let o = adapted_to_vec(r).await?; + assert_eq!(String::from_utf8(o)?, "hello\n"); Ok(()) } } diff --git a/src/adapters/writing.rs b/src/adapters/writing.rs index 132f7af..95dc0c0 100644 --- a/src/adapters/writing.rs +++ b/src/adapters/writing.rs @@ -1,37 +1,19 @@ use super::{FileAdapter, GetMetadata, ReadBox}; use anyhow::Result; -use std::io::Read; -use std::io::Write; -use std::thread::Thread; +use tokio::io::AsyncWrite; +// use async_trait::async_trait; -// this trait / struct split is ugly but necessary because of "conflicting trait implementation" otherwise with SpawningFileAdapter -#[dyn_clonable::clonable] -pub trait WritingFileAdapterTrait: GetMetadata + Send + Clone { - fn adapt_write<'a>( +pub trait WritingFileAdapter: GetMetadata + Send + Clone { + fn adapt_write( &self, - a: super::AdaptInfo<'a>, + a: super::AdaptInfo, detection_reason: &crate::matching::FileMatcher, - oup: &mut (dyn Write + 'a), + oup: &mut (dyn AsyncWrite), ) -> Result<()>; } -pub struct WritingFileAdapter { - inner: Box, -} -impl WritingFileAdapter { - pub fn new(inner: Box) -> WritingFileAdapter { - WritingFileAdapter { inner } - } -} - -impl GetMetadata for WritingFileAdapter { - fn metadata(&self) -> &super::AdapterMeta { - self.inner.metadata() - } -} - -struct PipedReadWriter<'a> { - inner: ReadBox<'a>, +/* struct PipedReadWriter { + inner: ReadBox, pipe_thread: Thread, } @@ -39,18 +21,20 @@ impl<'a> Read for PipedReadWriter<'a> { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { todo!() } -} +}*/ impl FileAdapter for WritingFileAdapter { - fn adapt<'a>( + fn adapt( &self, - ai_outer: super::AdaptInfo<'a>, + ai_outer: super::AdaptInfo, detection_reason: &crate::matching::FileMatcher, - ) -> anyhow::Result> { + ) -> anyhow::Result { let (r, w) = crate::pipe::pipe(); let cc = self.inner.clone(); let detc = detection_reason.clone(); - std::thread::spawn(move || { + panic!("ooo"); + // cc.adapt_write(ai_outer, detc, ) + /*tokio::spawn(move || { let mut oup = w; let ai = ai_outer; let res = cc.adapt_write(ai, &detc, &mut oup); @@ -58,8 +42,8 @@ impl FileAdapter for WritingFileAdapter { oup.write_err(std::io::Error::new(std::io::ErrorKind::Other, e)) .expect("could not write err"); } - }); + }); */ - Ok(Box::new(r)) + //Ok(Box::new(r)) } } diff --git a/src/preproc.rs b/src/preproc.rs index 6a8cb57..156a645 100644 --- a/src/preproc.rs +++ b/src/preproc.rs @@ -228,7 +228,7 @@ pub fn loop_adapt( })?; let s = stream! { for await file in inp { - match buf_choose_adapter(file).await.expect("todo: handle") { + match buf_choose_adapter(file?).await.expect("todo: handle") { Ret::Recurse(ai, adapter, detection_reason, _active_adapters) => { debug!( "Chose adapter '{}' because of matcher {:?}", @@ -245,7 +245,7 @@ pub fn loop_adapt( } Ret::Passthrough(ai) => { debug!("no adapter for {}, ending recursion", ai.filepath_hint.to_string_lossy()); - yield ai; + yield Ok(ai); } } } diff --git a/src/recurse.rs b/src/recurse.rs index 5ee9eaa..1772876 100644 --- a/src/recurse.rs +++ b/src/recurse.rs @@ -6,7 +6,8 @@ use async_stream::stream; pub fn concat_read_streams(input: AdaptedFilesIterBox) -> ReadBox { let s = stream! { for await output in input { - let stream = ReaderStream::new(output.inp); + let o = output.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?.inp; + let stream = ReaderStream::new(o); for await bytes in stream { yield bytes; }