From d3803f892c8f2fea714628390579ed2dbaa92940 Mon Sep 17 00:00:00 2001 From: Joseph LaFreniere Date: Fri, 23 Dec 2022 18:35:37 -0600 Subject: [PATCH] fixup! Implement async `postproc_pagebreaks` --- src/adapters/postproc.rs | 52 +++++++++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/src/adapters/postproc.rs b/src/adapters/postproc.rs index 6e311ac..0ccfae6 100644 --- a/src/adapters/postproc.rs +++ b/src/adapters/postproc.rs @@ -146,40 +146,54 @@ pub fn postproc_prefix(line_prefix: &str, inp: impl AsyncRead + Send) -> impl As /// Adds the prefix "Page N:" to each line, /// where N starts at one and is incremented for each ASCII Form Feed character in the input stream. /// ASCII form feeds are the page delimiters output by `pdftotext`. -pub fn postproc_pagebreaks(line_prefix: &str, inp: impl AsyncRead) -> impl AsyncRead { - let form_feed = b'\x0c'; - let regex = regex::bytes::Regex::new("\n").unwrap(); - let mut page_count = 1; - let mut line_prefix = format!("\n{}Page {}:", line_prefix, page_count); +pub fn postproc_pagebreaks(input: impl AsyncRead + Send) -> impl AsyncRead + Send { + let regex_linefeed = regex::bytes::Regex::new(r"\x0c").unwrap(); + let regex_newline = regex::bytes::Regex::new("\n").unwrap(); + let mut page_count: i32 = 1; + let mut page_prefix: String = format!("\nPage {}:", page_count); - let inp_stream = ReaderStream::new(inp); - let oup_stream = stream! { - for await chunk in inp_stream { + let input_stream = ReaderStream::new(input); + let output_stream = stream! { + for await chunk in input_stream { match chunk { Err(e) => yield Err(e), Ok(chunk) => { - let chunk_iter = chunk.split_inclusive(|byte| byte == &form_feed); - for sub_chunk in chunk_iter { - if sub_chunk == [form_feed] { - page_count += 1; - line_prefix = format!("\n{}Page {}:", line_prefix, page_count); - } else { - yield Ok(Bytes::copy_from_slice(®ex.replace_all(&sub_chunk, line_prefix.as_bytes()))); - } + let sub_chunks = regex_linefeed.split(&chunk); + for sub_chunk in sub_chunks { + // println!("{}", String::from_utf8_lossy(page_prefix.as_bytes())); + yield Ok(Bytes::copy_from_slice(page_prefix.as_bytes())); + yield Ok(Bytes::copy_from_slice(®ex_newline.replace_all(&sub_chunk, page_prefix.as_bytes()))); + page_count += 1; + page_prefix = format!("\nPage {}:", page_count); } } } } }; - StreamReader::new(oup_stream) + Box::pin(StreamReader::new(output_stream)) } #[cfg(test)] mod tests { use super::*; use anyhow::Result; - use std::io::Read; - use tokio::pin; + use tokio_test::io::Builder; + use tokio_test::io::Mock; + + #[tokio::test] + async fn test_with_pagebreaks() { + let mut output: Vec = Vec::new(); + let mock: Mock = Builder::new() + .read(b"Hello\nWorld\x0cFoo Bar\n\x0cTest") + .build(); + let res = postproc_pagebreaks(mock).read_to_end(&mut output).await; + println!("{}", String::from_utf8_lossy(&output)); + assert!(matches!(res, Ok(_))); + assert_eq!( + output, + b"\nPage 1:Hello\nPage 1:World\nPage 2:Foo Bar\nPage 2:\nPage 3:Test" + ); + } async fn test_from_strs( pagebreaks: bool,