From 54799f14528c35f6b2bfe5f05cb2e05e58ff5d10 Mon Sep 17 00:00:00 2001 From: phiresky Date: Sun, 13 Nov 2022 00:31:25 +0100 Subject: [PATCH] partial recursion + postproc --- src/adapters.rs | 2 +- src/adapters/custom.rs | 4 +- src/adapters/postproc.rs | 191 +++++++++++++-------------------------- src/bin/rga.rs | 2 +- src/preproc.rs | 14 +-- 5 files changed, 74 insertions(+), 139 deletions(-) diff --git a/src/adapters.rs b/src/adapters.rs index 67d8aaa..3052d08 100644 --- a/src/adapters.rs +++ b/src/adapters.rs @@ -1,7 +1,7 @@ pub mod custom; // pub mod decompress; // pub mod ffmpeg; -// pub mod postproc; +pub mod postproc; // pub mod pdfpages; pub mod spawning; use std::sync::Arc; diff --git a/src/adapters/custom.rs b/src/adapters/custom.rs index 43b49ea..3444f9d 100644 --- a/src/adapters/custom.rs +++ b/src/adapters/custom.rs @@ -216,6 +216,7 @@ impl CustomAdapterConfig { mod test { use super::super::FileAdapter; use super::*; + use crate::preproc::loop_adapt; use crate::test_utils::*; use anyhow::Result; use tokio::fs::File; @@ -232,7 +233,8 @@ mod test { let filepath = test_data_dir().join("short.pdf"); let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?)); - let r = adapter.adapt(a, &d)?; + // let r = adapter.adapt(a, &d)?; + let r = loop_adapt(&adapter, d, a)?; let o = adapted_to_vec(r).await?; assert_eq!( String::from_utf8(o)?, diff --git a/src/adapters/postproc.rs b/src/adapters/postproc.rs index 68c5b94..5251c9c 100644 --- a/src/adapters/postproc.rs +++ b/src/adapters/postproc.rs @@ -4,111 +4,24 @@ use anyhow::Context; use anyhow::Result; +use bytes::Bytes; use encoding_rs_io::DecodeReaderBytesBuilder; -use tokio::io::AsyncRead; - +use tokio::io::{AsyncRead, AsyncReadExt}; +use async_stream::stream; +use tokio_util::io::ReaderStream; +use tokio_util::io::StreamReader; +use std::io::Cursor; +use std::pin::Pin; use std::{ cmp::min, }; -use crate::adapted_iter::{AdaptedFilesIterBox, SingleAdaptedFileAsIter}; +use crate::adapted_iter::{AdaptedFilesIterBox}; use super::{AdaptInfo, AdapterMeta, FileAdapter, GetMetadata}; -/** pass through, except adding \n at the end */ -pub struct EnsureEndsWithNewline { - inner: R, - added_newline: bool, -} -impl EnsureEndsWithNewline { - pub fn new(r: R) -> EnsureEndsWithNewline { - EnsureEndsWithNewline { - inner: r, - added_newline: false, - } - } -} -impl AsyncRead for EnsureEndsWithNewline { - fn poll_read(&mut self, buf: &mut [u8]) -> std::io::Result { - match self.inner.read(buf) { - Ok(0) => { - if self.added_newline { - Ok(0) - } else { - buf[0] = b'\n'; - self.added_newline = true; - Ok(1) - } - } - Ok(n) => Ok(n), - Err(e) => Err(e), - } - } -} -struct ByteReplacer -where - R:AsyncRead, -{ - inner: R, - next_read: Vec, - replacer: Box Vec>, - haystacker: Box Option>, -} - -impl ByteReplacer -where - R: AsyncRead, -{ - fn output_next(&mut self, buf: &mut [u8], buf_valid_until: usize, replacement: &[u8]) -> usize { - let after_part1 = Vec::from(&buf[1..buf_valid_until]); - - /*let mut after_part = Vec::with_capacity(replacement.len() + replaced_len); - after_part.extend_from_slice(replacement); - after_part.extend_from_slice(&buf[..replaced_len]);*/ - - let writeable_count = min(buf.len(), replacement.len()); - buf[..writeable_count].copy_from_slice(&replacement[0..writeable_count]); - - let after_rep = &replacement[writeable_count..]; - let mut ov = Vec::new(); - ov.extend_from_slice(&after_rep); - ov.extend_from_slice(&after_part1); - ov.extend_from_slice(&self.next_read); - self.next_read = ov; - - return writeable_count; - } -} - -impl AsyncRead for ByteReplacer -where - R: AsyncRead, -{ - fn poll_read(&mut self, buf: &mut [u8]) -> std::io::Result { - let read = if self.next_read.len() > 0 { - let count = std::cmp::min(self.next_read.len(), buf.len()); - buf[0..count].copy_from_slice(&self.next_read[0..count]); - self.next_read.drain(0..count).count(); - Ok(count) - } else { - self.inner.read(buf) - }; - - match read { - Ok(u) => { - match (self.haystacker)(&buf[0..u]) { - Some(i) => { - let data = (self.replacer)(buf[i]); - - Ok(i + self.output_next(&mut buf[i..], u - i, &data)) - } - None => Ok(u), - } - // todo: use memchr2? - } - Err(e) => Err(e), - } - } +fn add_newline(ar: impl AsyncRead + Send) -> impl AsyncRead + Send { + ar.chain(Cursor::new(&[b'\n'])) } pub struct PostprocPrefix {} @@ -132,20 +45,20 @@ impl GetMetadata for PostprocPrefix { impl FileAdapter for PostprocPrefix { fn adapt<'a>( &self, - a: super::AdaptInfo<'a>, + a: super::AdaptInfo, _detection_reason: &crate::matching::FileMatcher, - ) -> Result> { - let read = EnsureEndsWithNewline::new(postproc_prefix( + ) -> Result { + let read = add_newline(postproc_prefix( &a.line_prefix, postproc_encoding(&a.line_prefix, a.inp)?, )); // keep adapt info (filename etc) except replace inp let ai = AdaptInfo { - inp: Box::new(read), + inp: Box::pin(read), postprocess: false, ..a }; - Ok(Box::new(SingleAdaptedFileAsIter::new(ai))) + Ok(Box::pin(tokio_stream::once(ai))) } } @@ -158,11 +71,13 @@ impl Read for ReadErr { } }*/ -pub fn postproc_encoding<'a, R: AsyncRead + 'a>( +pub fn postproc_encoding( line_prefix: &str, - inp: R, -) -> Result> { - // TODO: parse these options from ripgrep's configuration + inp: impl AsyncRead + Send + 'static, +) -> Result>> { + Ok(Box::pin(inp)) + // panic!("todo: implement"); + /*// TODO: parse these options from ripgrep's configuration let encoding = None; // detect bom but usually assume utf8 let bom_sniffing = true; let mut decode_builder = DecodeReaderBytesBuilder::new(); @@ -199,24 +114,39 @@ pub fn postproc_encoding<'a, R: AsyncRead + 'a>( } Ok(Box::new( std::io::Cursor::new(fourk).chain(beginning.into_inner()), - )) + ))*/ } -pub fn postproc_prefix(line_prefix: &str, inp: impl AsyncRead) -> impl AsyncRead { - let line_prefix = line_prefix.to_string(); // clone since we need it later - ByteReplacer { - inner: inp, - next_read: format!("{}", line_prefix).into_bytes(), - haystacker: Box::new(|buf| memchr::memchr(b'\n', buf)), - replacer: Box::new(move |_| format!("\n{}", line_prefix).into_bytes()), - } +pub fn postproc_prefix(line_prefix: &str, inp: impl AsyncRead + Send) -> impl AsyncRead + Send { + let line_prefix_n = format!("\n{}", line_prefix); // clone since we need it later + let line_prefix_o = Bytes::copy_from_slice(line_prefix.as_bytes()); + let regex = regex::bytes::Regex::new("\n").unwrap(); + let mut inp_stream = ReaderStream::new(inp); + let oup_stream = stream! { + yield Ok(line_prefix_o); + for await chunk in inp_stream { + match chunk { + Err(e) => yield Err(e), + Ok(chunk) => { + if chunk.contains(&b'\n') { + yield Ok(Bytes::copy_from_slice(®ex.replace_all(&chunk, line_prefix_n.as_bytes()))); + } else { + yield Ok(chunk); + } + } + } + } + }; + StreamReader::new(oup_stream) } pub fn postproc_pagebreaks(line_prefix: &str, inp: impl AsyncRead) -> impl AsyncRead { let line_prefix = line_prefix.to_string(); // clone since let mut page_count = 1; - ByteReplacer { + panic!("todo!"); + tokio::io::empty() + /*ByteReplacer { inner: inp, next_read: format!("{}Page {}:", line_prefix, page_count).into_bytes(), haystacker: Box::new(|buf| memchr::memchr2(b'\n', b'\x0c', buf)), @@ -228,26 +158,29 @@ pub fn postproc_pagebreaks(line_prefix: &str, inp: impl AsyncRead) -> impl Async } _ => b"[[imposs]]".to_vec(), }), - } + }*/ } #[cfg(test)] mod tests { use super::*; use anyhow::Result; + use tokio::pin; use std::io::Read; - fn test_from_strs(pagebreaks: bool, line_prefix: &str, a: &str, b: &str) -> Result<()> { - test_from_bytes(pagebreaks, line_prefix, a.as_bytes(), b) + async fn test_from_strs(pagebreaks: bool, line_prefix: &str, a: &'static str, b: &str) -> Result<()> { + test_from_bytes(pagebreaks, line_prefix, a.as_bytes(), b).await } - fn test_from_bytes(pagebreaks: bool, line_prefix: &str, a: &[u8], b: &str) -> Result<()> { + async fn test_from_bytes(pagebreaks: bool, line_prefix: &str, a: &'static [u8], b: &str) -> Result<()> { let mut oup = Vec::new(); let inp = postproc_encoding("", a)?; if pagebreaks { - postproc_pagebreaks(line_prefix, inp).read_to_end(&mut oup)?; + postproc_pagebreaks(line_prefix, inp).read_to_end(&mut oup).await?; } else { - postproc_prefix(line_prefix, inp).read_to_end(&mut oup)?; + let x = postproc_prefix(line_prefix, inp); + pin!(x); + x.read_to_end(&mut oup).await?; } let c = String::from_utf8_lossy(&oup); if b != c { @@ -262,32 +195,32 @@ mod tests { Ok(()) } - #[test] - fn post1() -> Result<()> { + #[tokio::test] + async fn post1() -> Result<()> { let inp = "What is this\nThis is a test\nFoo"; let oup = "Page 1:What is this\nPage 1:This is a test\nPage 1:Foo"; - test_from_strs(true, "", inp, oup)?; + test_from_strs(true, "", inp, oup).await?; println!("\n\n\n\n"); let inp = "What is this\nThis is a test\nFoo\x0c\nHelloooo\nHow are you?\x0c\nGreat!"; let oup = "Page 1:What is this\nPage 1:This is a test\nPage 1:Foo\nPage 2:\nPage 2:Helloooo\nPage 2:How are you?\nPage 3:\nPage 3:Great!"; - test_from_strs(true, "", inp, oup)?; + test_from_strs(true, "", inp, oup).await?; let inp = "What is this\nThis is a test\nFoo\x0c\nHelloooo\nHow are you?\x0c\nGreat!"; let oup = "foo.pdf:What is this\nfoo.pdf:This is a test\nfoo.pdf:Foo\x0c\nfoo.pdf:Helloooo\nfoo.pdf:How are you?\x0c\nfoo.pdf:Great!"; - test_from_strs(false, "foo.pdf:", inp, oup)?; + test_from_strs(false, "foo.pdf:", inp, oup).await?; test_from_strs( false, "foo:", "this is a test \n\n \0 foo", "foo:[rga: binary data]", - )?; - test_from_strs(false, "foo:", "\0", "foo:[rga: binary data]")?; + ).await?; + test_from_strs(false, "foo:", "\0", "foo:[rga: binary data]").await?; Ok(()) } diff --git a/src/bin/rga.rs b/src/bin/rga.rs index 67177bd..65e3115 100644 --- a/src/bin/rga.rs +++ b/src/bin/rga.rs @@ -15,7 +15,7 @@ fn list_adapters(args: RgaConfig) -> Result<()> { let (enabled_adapters, disabled_adapters) = get_all_adapters(args.custom_adapters.clone()); println!("Adapters:\n"); - let print = |adapter: std::rc::Arc| { + let print = |adapter: std::sync::Arc| { let meta = adapter.metadata(); let matchers = meta .fast_matchers diff --git a/src/preproc.rs b/src/preproc.rs index 661375d..5cdc2d3 100644 --- a/src/preproc.rs +++ b/src/preproc.rs @@ -65,6 +65,10 @@ async fn buf_choose_adapter(ai: AdaptInfo) -> Result { &mut inp, ) .await?; + let ai = AdaptInfo { + inp: Box::pin(inp), + ..ai + }; let (a, b, c) = match adapter { Some(x) => x, None => { @@ -91,10 +95,6 @@ async fn buf_choose_adapter(ai: AdaptInfo) -> Result { } } }; - let ai = AdaptInfo { - inp: Box::pin(inp), - ..ai - }; Ok(Ret::Recurse(ai, a, b, c)) } @@ -215,7 +215,7 @@ async fn adapt_caching( } } -fn loop_adapt( +pub fn loop_adapt( adapter: &dyn FileAdapter, detection_reason: FileMatcher, ai: AdaptInfo, @@ -233,8 +233,8 @@ fn loop_adapt( for await file in inp { match buf_choose_adapter(file).await.expect("todo: handle") { Ret::Recurse(ai, adapter, detection_reason, active_adapters) => { - for await file in loop_adapt(adapter.as_ref(), detection_reason, file).expect("todo: handle") { - yield file; + for await ifile in loop_adapt(adapter.as_ref(), detection_reason, ai).expect("todo: handle") { + yield ifile; } } Ret::Passthrough(ai) => {