From 2e1c74909e89c2d695aa770b482d62ac6b3fdfa2 Mon Sep 17 00:00:00 2001 From: phiresky Date: Sun, 25 Dec 2022 18:05:58 +0100 Subject: [PATCH] remove one level of abstraction from spawning adapters --- src/adapters/custom.rs | 195 ++++++++++++++++++++++++++++++++++++--- src/adapters/postproc.rs | 5 +- src/adapters/spawning.rs | 185 ------------------------------------- src/expand.rs | 10 +- 4 files changed, 189 insertions(+), 206 deletions(-) diff --git a/src/adapters/custom.rs b/src/adapters/custom.rs index bd5397e..6e762e4 100644 --- a/src/adapters/custom.rs +++ b/src/adapters/custom.rs @@ -1,14 +1,27 @@ -use super::{ - spawning::{SpawningFileAdapter, SpawningFileAdapterTrait}, - AdapterMeta, GetMetadata, +use super::*; +use super::{AdaptInfo, AdapterMeta, FileAdapter, GetMetadata}; +use crate::{ + adapted_iter::AdaptedFilesIterBox, + expand::expand_str_ez, + matching::{FastFileMatcher, FileMatcher}, }; -use crate::{matching::{FastFileMatcher, FileMatcher}, expand::expand_str_ez}; use anyhow::Result; +use async_stream::{stream, AsyncStream}; +use bytes::{Buf, Bytes}; use lazy_static::lazy_static; +use log::debug; +use log::*; use regex::{Captures, Regex}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::path::Path; +use tokio::process::Command; +use tokio_util::io::StreamReader; + +use std::future::Future; +use std::process::{ExitStatus, Stdio}; +use tokio::io::AsyncReadExt; +use tokio::process::Child; // mostly the same as AdapterMeta + SpawningFileAdapter #[derive(Debug, Deserialize, Serialize, JsonSchema, Default, PartialEq, Clone)] @@ -33,7 +46,11 @@ pub struct CustomAdapterConfig { /// {}: the file path (TODO) /// stdin of the program will be connected to the input file, and stdout is assumed to be the converted file pub args: Vec, - // TODO: make more flexible for inner matching (e.g. foo.tar.gz should be foo.tar after gunzipping) + /// The output path hint. + /// + /// If not set, defaults to ${input_path}.txt + /// + /// TODO: make more flexible for inner matching (e.g. foo.tar.gz should be foo.tar after gunzipping) pub output_path_hint: Option, } @@ -94,7 +111,8 @@ lazy_static! { "--markdown-headings=atx" ]), disabled_by_default: None, - match_only_by_mime: None + match_only_by_mime: None, + output_path_hint: None }, CustomAdapterConfig { name: "poppler".to_owned(), @@ -108,12 +126,72 @@ lazy_static! { binary: "pdftotext".to_string(), args: strs(&["-", "-"]), disabled_by_default: None, - match_only_by_mime: None + match_only_by_mime: None, + output_path_hint: Some("${input_path}.txt.asciipagebreaks".into()) // postprocessors: [{name: "add_page_numbers_by_pagebreaks"}] } ]; } +/// replace a Command.spawn() error "File not found" with a more readable error +/// to indicate some program is not installed +pub fn map_exe_error(err: std::io::Error, exe_name: &str, help: &str) -> Error { + use std::io::ErrorKind::*; + match err.kind() { + NotFound => format_err!("Could not find executable \"{}\". {}", exe_name, help), + _ => Error::from(err), + } +} + +/** waits for a process to finish, returns an io error if the process failed */ +struct ProcWaitReader { + process: Option, + future: Option>>>>, +} +impl ProcWaitReader { + fn new(cmd: Child) -> ProcWaitReader { + ProcWaitReader { + process: Some(cmd), + future: None, + } + } +} +fn proc_wait(mut child: Child) -> impl AsyncRead { + let s = stream! { + let res = child.wait().await?; + if res.success() { + yield std::io::Result::Ok(Bytes::new()); + } else { + yield std::io::Result::Err(std::io::Error::new( + std::io::ErrorKind::Other, + format_err!("subprocess failed: {:?}", res), + )); + } + }; + StreamReader::new(s) +} +pub fn pipe_output<'a>( + _line_prefix: &str, + mut cmd: Command, + inp: ReadBox, + exe_name: &str, + help: &str, +) -> Result { + let mut cmd = cmd + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn() + .map_err(|e| map_exe_error(e, exe_name, help))?; + let mut stdi = cmd.stdin.take().expect("is piped"); + let stdo = cmd.stdout.take().expect("is piped"); + + tokio::spawn(async move { + let mut z = inp; + tokio::io::copy(&mut z, &mut stdi).await.unwrap(); + }); + Ok(Box::pin(stdo.chain(proc_wait(cmd)))) +} + pub struct CustomSpawningFileAdapter { binary: String, args: Vec, @@ -127,13 +205,14 @@ impl GetMetadata for CustomSpawningFileAdapter { } fn arg_replacer(arg: &str, filepath_hint: &Path) -> Result { Ok(expand_str_ez(arg, |s| match s { - "file_extension" => &filepath_hint + "file_extension" => filepath_hint .extension() .map(|e| e.to_string_lossy()) .unwrap_or("".into()), + _ => panic!("unknown replacer"), })) } -impl SpawningFileAdapterTrait for CustomSpawningFileAdapter { +impl CustomSpawningFileAdapter { fn get_exe(&self) -> &str { &self.binary } @@ -152,12 +231,53 @@ impl SpawningFileAdapterTrait for CustomSpawningFileAdapter { Ok(command) } } +impl FileAdapter for CustomSpawningFileAdapter { + fn adapt<'a>( + &self, + ai: AdaptInfo, + _detection_reason: &FileMatcher, + ) -> Result { + let AdaptInfo { + filepath_hint, + inp, + line_prefix, + archive_recursion_depth, + postprocess, + config, + .. + } = ai; + + let cmd = Command::new(&self.binary); + let cmd = self + .command(&filepath_hint, cmd) + .with_context(|| format!("Could not set cmd arguments for {}", self.binary))?; + debug!("executing {:?}", cmd); + let output = pipe_output(&line_prefix, cmd, inp, &self.binary, "")?; + Ok(Box::pin(tokio_stream::once(AdaptInfo { + filepath_hint: PathBuf::from(expand_str_ez( + self.output_path_hint + .as_deref() + .unwrap_or("${input_path}.txt"), + |r| match r { + "input_path" => filepath_hint.to_string_lossy(), + _ => panic!("unknown replacer in output_path_hint"), + }, + )), + inp: output, + line_prefix, + is_real_file: false, + archive_recursion_depth, + postprocess, + config, + }))) + } +} impl CustomAdapterConfig { - pub fn to_adapter(&self) -> SpawningFileAdapter { - let ad = CustomSpawningFileAdapter { + pub fn to_adapter(&self) -> CustomSpawningFileAdapter { + CustomSpawningFileAdapter { binary: self.binary.clone(), args: self.args.clone(), - output_path_hint: self.output_path_hint, + output_path_hint: self.output_path_hint.clone(), meta: AdapterMeta { name: self.name.clone(), version: self.version, @@ -182,8 +302,7 @@ impl CustomAdapterConfig { keep_fast_matchers_if_accurate: !self.match_only_by_mime.unwrap_or(false), disabled_by_default: self.disabled_by_default.unwrap_or(false), }, - }; - SpawningFileAdapter::new(Box::new(ad)) + } } } @@ -223,4 +342,52 @@ PREFIX:\u{c} ); Ok(()) } + + use std::io::Cursor; + + use super::*; + use crate::adapters::FileAdapter; + use crate::{ + adapters::custom::CustomAdapterConfig, + test_utils::{adapted_to_vec, simple_adapt_info}, + }; + + #[tokio::test] + async fn streaming() -> anyhow::Result<()> { + // an adapter that converts input line by line (deadlocks if the parent process tries to write everything and only then read it) + let adapter = CustomAdapterConfig { + name: "simple text replacer".to_string(), + description: "oo".to_string(), + disabled_by_default: None, + version: 1, + extensions: vec!["txt".to_string()], + mimetypes: None, + match_only_by_mime: None, + binary: "sed".to_string(), + args: vec!["s/e/u/g".to_string()], + }; + + let adapter = adapter.to_adapter(); + let input = r#" + This is the story of a + very strange lorry + with a long dead crew + and a witch with the flu + "#; + let input = format!("{0}{0}{0}{0}", input); + let input = format!("{0}{0}{0}{0}", input); + let input = format!("{0}{0}{0}{0}", input); + let input = format!("{0}{0}{0}{0}", input); + let input = format!("{0}{0}{0}{0}", input); + let input = format!("{0}{0}{0}{0}", input); + let (a, d) = simple_adapt_info( + &Path::new("foo.txt"), + Box::pin(Cursor::new(Vec::from(input))), + ); + let output = adapter.adapt(a, &d).unwrap(); + + let oup = adapted_to_vec(output).await?; + println!("output: {}", String::from_utf8_lossy(&oup)); + Ok(()) + } } diff --git a/src/adapters/postproc.rs b/src/adapters/postproc.rs index 7915b87..c27cf6b 100644 --- a/src/adapters/postproc.rs +++ b/src/adapters/postproc.rs @@ -144,7 +144,6 @@ pub fn postproc_prefix(line_prefix: &str, inp: impl AsyncRead + Send) -> impl As Box::pin(StreamReader::new(oup_stream)) } - pub struct PostprocPageBreaks {} impl GetMetadata for PostprocPageBreaks { fn metadata(&self) -> &super::AdapterMeta { @@ -154,7 +153,7 @@ impl GetMetadata for PostprocPageBreaks { version: 1, description: "Adds the page number to each line for an input file that specifies page breaks as ascii page break character".to_owned(), recurses: false, - fast_matchers: vec![FastFileMatcher::FileExtension("txtwithpagebreaks".to_string())], + fast_matchers: vec![FastFileMatcher::FileExtension("asciipagebreaks".to_string())], slow_matchers: None, keep_fast_matchers_if_accurate: false, disabled_by_default: false @@ -221,9 +220,9 @@ pub fn postproc_pagebreaks( mod tests { use super::*; use anyhow::Result; + use tokio::pin; use tokio_test::io::Builder; use tokio_test::io::Mock; - use tokio::pin; #[tokio::test] async fn test_with_pagebreaks() { diff --git a/src/adapters/spawning.rs b/src/adapters/spawning.rs index 50ea56c..8b13789 100644 --- a/src/adapters/spawning.rs +++ b/src/adapters/spawning.rs @@ -1,186 +1 @@ -use super::*; -use anyhow::Result; -use async_stream::{stream, AsyncStream}; -use bytes::{Buf, Bytes}; -use log::*; -use tokio_util::io::StreamReader; -use crate::adapters::FileAdapter; -use crate::expand::expand_str_ez; -use std::future::Future; -use std::path::Path; -use std::process::{ExitStatus, Stdio}; -use tokio::io::AsyncReadExt; -use tokio::process::{Child, Command}; - -// TODO: don't separate the trait and the struct -pub trait SpawningFileAdapterTrait: GetMetadata + Send + Sync { - fn get_exe(&self) -> &str; - fn command(&self, filepath_hint: &Path, command: Command) -> Result; -} - -pub struct SpawningFileAdapter { - inner: Box, -} - -impl SpawningFileAdapter { - pub fn new(inner: Box) -> SpawningFileAdapter { - SpawningFileAdapter { inner } - } -} - -impl GetMetadata for SpawningFileAdapter { - fn metadata(&self) -> &AdapterMeta { - self.inner.metadata() - } -} - - -/// replace a Command.spawn() error "File not found" with a more readable error -/// to indicate some program is not installed -pub fn map_exe_error(err: std::io::Error, exe_name: &str, help: &str) -> Error { - use std::io::ErrorKind::*; - match err.kind() { - NotFound => format_err!("Could not find executable \"{}\". {}", exe_name, help), - _ => Error::from(err), - } -} - -/** waits for a process to finish, returns an io error if the process failed */ -struct ProcWaitReader { - process: Option, - future: Option>>>>, -} -impl ProcWaitReader { - fn new(cmd: Child) -> ProcWaitReader { - ProcWaitReader { - process: Some(cmd), - future: None, - } - } -} -fn proc_wait(mut child: Child) -> impl AsyncRead { - let s = stream! { - let res = child.wait().await?; - if res.success() { - yield std::io::Result::Ok(Bytes::new()); - } else { - yield std::io::Result::Err(std::io::Error::new( - std::io::ErrorKind::Other, - format_err!("subprocess failed: {:?}", res), - )); - } - }; - StreamReader::new(s) -} -pub fn pipe_output<'a>( - _line_prefix: &str, - mut cmd: Command, - inp: ReadBox, - exe_name: &str, - help: &str, -) -> Result { - let mut cmd = cmd - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .spawn() - .map_err(|e| map_exe_error(e, exe_name, help))?; - let mut stdi = cmd.stdin.take().expect("is piped"); - let stdo = cmd.stdout.take().expect("is piped"); - - tokio::spawn(async move { - let mut z = inp; - tokio::io::copy(&mut z, &mut stdi).await.unwrap(); - }); - Ok(Box::pin(stdo.chain(proc_wait(cmd)))) -} - -impl FileAdapter for SpawningFileAdapter { - fn adapt<'a>( - &self, - ai: AdaptInfo, - _detection_reason: &FileMatcher, - ) -> Result { - let AdaptInfo { - filepath_hint, - inp, - line_prefix, - archive_recursion_depth, - postprocess, - config, - .. - } = ai; - - let cmd = Command::new(self.inner.get_exe()); - let cmd = self - .inner - .command(&filepath_hint, cmd) - .with_context(|| format!("Could not set cmd arguments for {}", self.inner.get_exe()))?; - debug!("executing {:?}", cmd); - let output = pipe_output(&line_prefix, cmd, inp, self.inner.get_exe(), "")?; - Ok(Box::pin(tokio_stream::once(AdaptInfo { - filepath_hint: PathBuf::from( - expand_str_ez(self.inner.output_path_hint, |r| match r { - "fullname" => &filepath_hint.to_string_lossy() - } - )), - inp: output, - line_prefix, - is_real_file: false, - archive_recursion_depth, - postprocess, - config, - }))) - } -} - -#[cfg(test)] -mod test { - use std::io::Cursor; - - use super::*; - use crate::adapters::FileAdapter; - use crate::{ - adapters::custom::CustomAdapterConfig, - test_utils::{adapted_to_vec, simple_adapt_info}, - }; - - #[tokio::test] - async fn streaming() -> anyhow::Result<()> { - // an adapter that converts input line by line (deadlocks if the parent process tries to write everything and only then read it) - let adapter = CustomAdapterConfig { - name: "simple text replacer".to_string(), - description: "oo".to_string(), - disabled_by_default: None, - version: 1, - extensions: vec!["txt".to_string()], - mimetypes: None, - match_only_by_mime: None, - binary: "sed".to_string(), - args: vec!["s/e/u/g".to_string()], - }; - - let adapter = adapter.to_adapter(); - let input = r#" - This is the story of a - very strange lorry - with a long dead crew - and a witch with the flu - "#; - let input = format!("{0}{0}{0}{0}", input); - let input = format!("{0}{0}{0}{0}", input); - let input = format!("{0}{0}{0}{0}", input); - let input = format!("{0}{0}{0}{0}", input); - let input = format!("{0}{0}{0}{0}", input); - let input = format!("{0}{0}{0}{0}", input); - let (a, d) = simple_adapt_info( - &Path::new("foo.txt"), - Box::pin(Cursor::new(Vec::from(input))), - ); - let output = adapter.adapt(a, &d).unwrap(); - - let oup = adapted_to_vec(output).await?; - println!("output: {}", String::from_utf8_lossy(&oup)); - Ok(()) - } -} diff --git a/src/expand.rs b/src/expand.rs index f2ee294..ca5220f 100644 --- a/src/expand.rs +++ b/src/expand.rs @@ -1,3 +1,5 @@ +use std::borrow::Cow; + use regex::Captures; // from https://github.com/phiresky/timetrackrs/blob/1c3df09ba2c1fda6065f2927045bd28dea0738d3/src/expand.rs @@ -27,7 +29,7 @@ pub fn get_capture<'a>(caps: &'a [Captures], reference: &str) -> Option<&'a str> pub fn expand_str_captures(caps: &[Captures], replacement: &str) -> String { let mut dst = String::new(); expand_str_lambda( - |reference: &str| get_capture(caps, reference).unwrap_or(""), + |reference: &str| Cow::Borrowed(get_capture(caps, reference).unwrap_or("")), replacement, &mut dst, ); @@ -36,7 +38,7 @@ pub fn expand_str_captures(caps: &[Captures], replacement: &str) -> String { pub fn expand_str_ez<'a, F>(replacement: &'a str, lambda: F) -> String where - F: Fn(&str) -> &'a str, + F: Fn(&str) -> Cow<'a, str>, { let mut dst = String::new(); expand_str_lambda(lambda, replacement, &mut dst); @@ -45,7 +47,7 @@ where pub fn expand_str_lambda<'a, F>(cap: F, replacement: &'a str, dst: &mut String) where - F: Fn(&str) -> &'a str, + F: Fn(&str) -> Cow<'a, str>, { let mut replacement = replacement; while !replacement.is_empty() { @@ -71,7 +73,7 @@ where } }; replacement = &replacement[cap_ref.end..]; - dst.push_str(cap(cap_ref.cap)); + dst.push_str(cap(cap_ref.cap).as_ref()); } dst.push_str(replacement); }