reimplement tar adapter async

This commit is contained in:
phiresky 2022-12-26 18:58:17 +01:00
parent 9ce11656d1
commit c4dbabaf10
12 changed files with 123 additions and 98 deletions

27
Cargo.lock generated
View File

@ -1326,10 +1326,10 @@ dependencies = [
"serde_json", "serde_json",
"size_format", "size_format",
"structopt", "structopt",
"tar",
"tempfile", "tempfile",
"tokio 1.23.0", "tokio 1.23.0",
"tokio-stream", "tokio-stream",
"tokio-tar",
"tokio-test", "tokio-test",
"tokio-util", "tokio-util",
"tree_magic_mini", "tree_magic_mini",
@ -1600,17 +1600,6 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "tar"
version = "0.4.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b55807c0344e1e6c04d7c965f5289c39a8d94ae23ed5c0b57aabac549f871c6"
dependencies = [
"filetime",
"libc",
"xattr",
]
[[package]] [[package]]
name = "tempfile" name = "tempfile"
version = "3.3.0" version = "3.3.0"
@ -1778,6 +1767,20 @@ dependencies = [
"tokio-util", "tokio-util",
] ]
[[package]]
name = "tokio-tar"
version = "0.3.0"
source = "git+https://github.com/vorot93/tokio-tar#1bd30fbd1a219e8982571da48eb68f34317d1e15"
dependencies = [
"filetime",
"futures-core",
"libc",
"redox_syscall",
"tokio 1.23.0",
"tokio-stream",
"xattr",
]
[[package]] [[package]]
name = "tokio-test" name = "tokio-test"
version = "0.4.2" version = "0.4.2"

View File

@ -46,10 +46,10 @@ serde = {version = "1.0.115", features = ["derive"]}
serde_json = "1.0.57" serde_json = "1.0.57"
size_format = "1.0.2" size_format = "1.0.2"
structopt = "0.3.17" structopt = "0.3.17"
tar = "0.4.30"
tempfile = "3.1.0" tempfile = "3.1.0"
tokio = {version = "1.21.2", features = ["full"]} tokio = {version = "1.21.2", features = ["full"]}
tokio-stream = {version = "0.1.11", features = ["io-util", "tokio-util"]} tokio-stream = {version = "0.1.11", features = ["io-util", "tokio-util"]}
tokio-tar = { git = "https://github.com/vorot93/tokio-tar", version = "0.3.0" }
tokio-util = {version = "0.7.4", features = ["io", "full"]} tokio-util = {version = "0.7.4", features = ["io", "full"]}
tree_magic = {package = "tree_magic_mini", version = "3.0.0"} tree_magic = {package = "tree_magic_mini", version = "3.0.0"}
zip = "0.6.3" zip = "0.6.3"

View File

@ -4,7 +4,11 @@ use tokio_stream::Stream;
use crate::adapters::AdaptInfo; use crate::adapters::AdaptInfo;
pub trait AdaptedFilesIter: Stream<Item = AdaptInfo> + Send {} pub trait AdaptedFilesIter: Stream<Item = anyhow::Result<AdaptInfo>> + Send {}
impl<T> AdaptedFilesIter for T where T: Stream<Item = AdaptInfo> + Send {} impl<T> AdaptedFilesIter for T where T: Stream<Item = anyhow::Result<AdaptInfo>> + Send {}
pub type AdaptedFilesIterBox = Pin<Box<dyn AdaptedFilesIter>>; pub type AdaptedFilesIterBox = Pin<Box<dyn AdaptedFilesIter>>;
pub fn one_file(ai: AdaptInfo) -> AdaptedFilesIterBox {
Box::pin(tokio_stream::once(Ok(ai)))
}

View File

@ -3,10 +3,9 @@ pub mod decompress;
// pub mod ffmpeg; // pub mod ffmpeg;
pub mod postproc; pub mod postproc;
// pub mod pdfpages; // pub mod pdfpages;
pub mod spawning;
use std::sync::Arc; use std::sync::Arc;
// pub mod sqlite; // pub mod sqlite;
// pub mod tar; pub mod tar;
// pub mod tesseract; // pub mod tesseract;
// pub mod writing; // pub mod writing;
// pub mod zip; // pub mod zip;
@ -118,7 +117,7 @@ pub fn get_all_adapters(custom_adapters: Option<Vec<CustomAdapterConfig>>) -> Ad
//Rc::new(ffmpeg::FFmpegAdapter::new()), //Rc::new(ffmpeg::FFmpegAdapter::new()),
// Rc::new(zip::ZipAdapter::new()), // Rc::new(zip::ZipAdapter::new()),
Arc::new(decompress::DecompressAdapter::new()), Arc::new(decompress::DecompressAdapter::new()),
// Rc::new(tar::TarAdapter::new()), Arc::new(tar::TarAdapter::new()),
//Rc::new(sqlite::SqliteAdapter::new()), //Rc::new(sqlite::SqliteAdapter::new()),
// Rc::new(pdfpages::PdfPagesAdapter::new()), // Rc::new(pdfpages::PdfPagesAdapter::new()),
// Rc::new(tesseract::TesseractAdapter::new()), // Rc::new(tesseract::TesseractAdapter::new()),

View File

@ -1,5 +1,6 @@
use super::*; use super::*;
use super::{AdaptInfo, AdapterMeta, FileAdapter, GetMetadata}; use super::{AdaptInfo, AdapterMeta, FileAdapter, GetMetadata};
use crate::adapted_iter::one_file;
use crate::{ use crate::{
adapted_iter::AdaptedFilesIterBox, adapted_iter::AdaptedFilesIterBox,
expand::expand_str_ez, expand::expand_str_ez,
@ -234,7 +235,7 @@ impl FileAdapter for CustomSpawningFileAdapter {
.with_context(|| format!("Could not set cmd arguments for {}", self.binary))?; .with_context(|| format!("Could not set cmd arguments for {}", self.binary))?;
debug!("executing {:?}", cmd); debug!("executing {:?}", cmd);
let output = pipe_output(&line_prefix, cmd, inp, &self.binary, "")?; let output = pipe_output(&line_prefix, cmd, inp, &self.binary, "")?;
Ok(Box::pin(tokio_stream::once(AdaptInfo { Ok(one_file(AdaptInfo {
filepath_hint: PathBuf::from(expand_str_ez( filepath_hint: PathBuf::from(expand_str_ez(
self.output_path_hint self.output_path_hint
.as_deref() .as_deref()
@ -250,7 +251,7 @@ impl FileAdapter for CustomSpawningFileAdapter {
archive_recursion_depth: archive_recursion_depth + 1, archive_recursion_depth: archive_recursion_depth + 1,
postprocess, postprocess,
config, config,
}))) }))
} }
} }
impl CustomAdapterConfig { impl CustomAdapterConfig {

View File

@ -1,3 +1,5 @@
use crate::adapted_iter::one_file;
use super::*; use super::*;
use anyhow::Result; use anyhow::Result;
@ -93,7 +95,7 @@ fn get_inner_filename(filename: &Path) -> PathBuf {
impl FileAdapter for DecompressAdapter { impl FileAdapter for DecompressAdapter {
fn adapt(&self, ai: AdaptInfo, detection_reason: &FileMatcher) -> Result<AdaptedFilesIterBox> { fn adapt(&self, ai: AdaptInfo, detection_reason: &FileMatcher) -> Result<AdaptedFilesIterBox> {
Ok(Box::pin(tokio_stream::once(AdaptInfo { Ok(one_file(AdaptInfo {
filepath_hint: get_inner_filename(&ai.filepath_hint), filepath_hint: get_inner_filename(&ai.filepath_hint),
is_real_file: false, is_real_file: false,
archive_recursion_depth: ai.archive_recursion_depth + 1, archive_recursion_depth: ai.archive_recursion_depth + 1,
@ -101,7 +103,7 @@ impl FileAdapter for DecompressAdapter {
line_prefix: ai.line_prefix, line_prefix: ai.line_prefix,
config: ai.config.clone(), config: ai.config.clone(),
postprocess: ai.postprocess, postprocess: ai.postprocess,
}))) }))
} }
} }

View File

@ -13,6 +13,7 @@ use tokio::io::{AsyncRead, AsyncReadExt};
use tokio_util::io::ReaderStream; use tokio_util::io::ReaderStream;
use tokio_util::io::StreamReader; use tokio_util::io::StreamReader;
use crate::adapted_iter::one_file;
use crate::adapted_iter::AdaptedFilesIterBox; use crate::adapted_iter::AdaptedFilesIterBox;
use crate::matching::FastFileMatcher; use crate::matching::FastFileMatcher;
@ -56,7 +57,7 @@ impl FileAdapter for PostprocPrefix {
postprocess: false, postprocess: false,
..a ..a
}; };
Ok(Box::pin(tokio_stream::once(ai))) Ok(one_file(ai))
} }
} }
@ -183,7 +184,7 @@ impl FileAdapter for PostprocPageBreaks {
.join(a.filepath_hint.file_stem().unwrap_or_default()), .join(a.filepath_hint.file_stem().unwrap_or_default()),
..a ..a
}; };
Ok(Box::pin(tokio_stream::once(ai))) Ok(one_file(ai))
} }
} }
/// Adds the prefix "Page N:" to each line, /// Adds the prefix "Page N:" to each line,

View File

@ -1 +0,0 @@

View File

@ -1,11 +1,20 @@
use super::*; use crate::{
use crate::{preproc::rga_preproc, print_bytes}; adapted_iter::AdaptedFilesIterBox,
use ::tar::EntryType::Regular; adapters::AdapterMeta,
matching::{FastFileMatcher, FileMatcher},
preproc::rga_preproc,
print_bytes,
};
use anyhow::*; use anyhow::*;
use async_stream::stream;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use log::*; use log::*;
use std::path::PathBuf; use std::path::PathBuf;
use writing::{WritingFileAdapter, WritingFileAdapterTrait}; use tokio::io::AsyncWrite;
use tokio_stream::StreamExt;
use tokio_util::io::StreamReader;
use super::{AdaptInfo, FileAdapter, GetMetadata};
static EXTENSIONS: &[&str] = &["tar"]; static EXTENSIONS: &[&str] = &["tar"];
@ -28,8 +37,8 @@ lazy_static! {
pub struct TarAdapter; pub struct TarAdapter;
impl TarAdapter { impl TarAdapter {
pub fn new() -> WritingFileAdapter { pub fn new() -> TarAdapter {
WritingFileAdapter::new(Box::new(TarAdapter)) TarAdapter
} }
} }
impl GetMetadata for TarAdapter { impl GetMetadata for TarAdapter {
@ -38,45 +47,67 @@ impl GetMetadata for TarAdapter {
} }
} }
impl WritingFileAdapterTrait for TarAdapter { impl FileAdapter for TarAdapter {
fn adapt_write( fn adapt(&self, ai: AdaptInfo, _detection_reason: &FileMatcher) -> Result<AdaptedFilesIterBox> {
&self,
ai: AdaptInfo,
_detection_reason: &FileMatcher,
oup: &mut dyn Write,
) -> Result<()> {
let AdaptInfo { let AdaptInfo {
filepath_hint, filepath_hint,
mut inp, inp,
line_prefix, line_prefix,
archive_recursion_depth, archive_recursion_depth,
config, config,
postprocess,
.. ..
} = ai; } = ai;
let mut archive = ::tar::Archive::new(&mut inp); let mut archive = ::tokio_tar::Archive::new(inp);
for entry in archive.entries()? {
let mut entries = archive.entries()?;
let s = stream! {
while let Some(entry) = entries.next().await {
let mut file = entry?; let mut file = entry?;
if Regular == file.header().entry_type() { if tokio_tar::EntryType::Regular == file.header().entry_type() {
let path = PathBuf::from(file.path()?.to_owned()); let path = PathBuf::from(file.path()?.to_owned());
debug!( debug!(
"{}|{}: {}", "{}|{}: {}",
filepath_hint.display(), filepath_hint.display(),
path.display(), path.display(),
print_bytes(file.header().size()? as f64), print_bytes(file.header().size().unwrap_or(0) as f64),
); );
let line_prefix = &format!("{}{}: ", line_prefix, path.display()); let line_prefix = &format!("{}{}: ", line_prefix, path.display());
let ai2: AdaptInfo = AdaptInfo { let ai2: AdaptInfo = AdaptInfo {
filepath_hint: path, filepath_hint: path,
is_real_file: false, is_real_file: false,
archive_recursion_depth: archive_recursion_depth + 1, archive_recursion_depth: archive_recursion_depth + 1,
inp: Box::new(file), inp: Box::pin(file),
oup, line_prefix: line_prefix.to_string(),
line_prefix,
config: config.clone(), config: config.clone(),
postprocess,
}; };
rga_preproc(ai2)?; yield Ok(ai2);
} }
} }
};
Ok(Box::pin(s))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::*;
use pretty_assertions::assert_eq;
use tokio::fs::File;
#[tokio::test]
async fn test_simple_tar() -> Result<()> {
let filepath = test_data_dir().join("test.tar");
let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?));
let adapter = TarAdapter::new();
let r = adapter.adapt(a, &d)?;
let o = adapted_to_vec(r).await?;
assert_eq!(String::from_utf8(o)?, "hello\n");
Ok(()) Ok(())
} }
} }

View File

@ -1,37 +1,19 @@
use super::{FileAdapter, GetMetadata, ReadBox}; use super::{FileAdapter, GetMetadata, ReadBox};
use anyhow::Result; use anyhow::Result;
use std::io::Read; use tokio::io::AsyncWrite;
use std::io::Write; // use async_trait::async_trait;
use std::thread::Thread;
// this trait / struct split is ugly but necessary because of "conflicting trait implementation" otherwise with SpawningFileAdapter pub trait WritingFileAdapter: GetMetadata + Send + Clone {
#[dyn_clonable::clonable] fn adapt_write(
pub trait WritingFileAdapterTrait: GetMetadata + Send + Clone {
fn adapt_write<'a>(
&self, &self,
a: super::AdaptInfo<'a>, a: super::AdaptInfo,
detection_reason: &crate::matching::FileMatcher, detection_reason: &crate::matching::FileMatcher,
oup: &mut (dyn Write + 'a), oup: &mut (dyn AsyncWrite),
) -> Result<()>; ) -> Result<()>;
} }
pub struct WritingFileAdapter { /* struct PipedReadWriter {
inner: Box<dyn WritingFileAdapterTrait>, inner: ReadBox,
}
impl WritingFileAdapter {
pub fn new(inner: Box<dyn WritingFileAdapterTrait>) -> WritingFileAdapter {
WritingFileAdapter { inner }
}
}
impl GetMetadata for WritingFileAdapter {
fn metadata(&self) -> &super::AdapterMeta {
self.inner.metadata()
}
}
struct PipedReadWriter<'a> {
inner: ReadBox<'a>,
pipe_thread: Thread, pipe_thread: Thread,
} }
@ -39,18 +21,20 @@ impl<'a> Read for PipedReadWriter<'a> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
todo!() todo!()
} }
} }*/
impl FileAdapter for WritingFileAdapter { impl FileAdapter for WritingFileAdapter {
fn adapt<'a>( fn adapt(
&self, &self,
ai_outer: super::AdaptInfo<'a>, ai_outer: super::AdaptInfo,
detection_reason: &crate::matching::FileMatcher, detection_reason: &crate::matching::FileMatcher,
) -> anyhow::Result<ReadBox<'a>> { ) -> anyhow::Result<ReadBox> {
let (r, w) = crate::pipe::pipe(); let (r, w) = crate::pipe::pipe();
let cc = self.inner.clone(); let cc = self.inner.clone();
let detc = detection_reason.clone(); let detc = detection_reason.clone();
std::thread::spawn(move || { panic!("ooo");
// cc.adapt_write(ai_outer, detc, )
/*tokio::spawn(move || {
let mut oup = w; let mut oup = w;
let ai = ai_outer; let ai = ai_outer;
let res = cc.adapt_write(ai, &detc, &mut oup); let res = cc.adapt_write(ai, &detc, &mut oup);
@ -58,8 +42,8 @@ impl FileAdapter for WritingFileAdapter {
oup.write_err(std::io::Error::new(std::io::ErrorKind::Other, e)) oup.write_err(std::io::Error::new(std::io::ErrorKind::Other, e))
.expect("could not write err"); .expect("could not write err");
} }
}); }); */
Ok(Box::new(r)) //Ok(Box::new(r))
} }
} }

View File

@ -228,7 +228,7 @@ pub fn loop_adapt(
})?; })?;
let s = stream! { let s = stream! {
for await file in inp { for await file in inp {
match buf_choose_adapter(file).await.expect("todo: handle") { match buf_choose_adapter(file?).await.expect("todo: handle") {
Ret::Recurse(ai, adapter, detection_reason, _active_adapters) => { Ret::Recurse(ai, adapter, detection_reason, _active_adapters) => {
debug!( debug!(
"Chose adapter '{}' because of matcher {:?}", "Chose adapter '{}' because of matcher {:?}",
@ -245,7 +245,7 @@ pub fn loop_adapt(
} }
Ret::Passthrough(ai) => { Ret::Passthrough(ai) => {
debug!("no adapter for {}, ending recursion", ai.filepath_hint.to_string_lossy()); debug!("no adapter for {}, ending recursion", ai.filepath_hint.to_string_lossy());
yield ai; yield Ok(ai);
} }
} }
} }

View File

@ -6,7 +6,8 @@ use async_stream::stream;
pub fn concat_read_streams(input: AdaptedFilesIterBox) -> ReadBox { pub fn concat_read_streams(input: AdaptedFilesIterBox) -> ReadBox {
let s = stream! { let s = stream! {
for await output in input { for await output in input {
let stream = ReaderStream::new(output.inp); let o = output.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?.inp;
let stream = ReaderStream::new(o);
for await bytes in stream { for await bytes in stream {
yield bytes; yield bytes;
} }