From af168efe1a1054764ff619153444dd68f81cfe3b Mon Sep 17 00:00:00 2001 From: Joseph LaFreniere Date: Thu, 22 Dec 2022 15:15:33 -0600 Subject: [PATCH] Implement async `postproc_pagebreaks` --- src/adapters/postproc.rs | 53 +++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/src/adapters/postproc.rs b/src/adapters/postproc.rs index cf80da5..7ca1bb5 100644 --- a/src/adapters/postproc.rs +++ b/src/adapters/postproc.rs @@ -119,14 +119,12 @@ pub fn postproc_encoding( ))*/ } -/** - * adds the given prefix to each line in a AsyncRead - */ +/// Adds the given prefix to each line in an `AsyncRead`. pub fn postproc_prefix(line_prefix: &str, inp: impl AsyncRead + Send) -> impl AsyncRead + Send { let line_prefix_n = format!("\n{}", line_prefix); // clone since we need it later let line_prefix_o = Bytes::copy_from_slice(line_prefix.as_bytes()); let regex = regex::bytes::Regex::new("\n").unwrap(); - let mut inp_stream = ReaderStream::new(inp); + let inp_stream = ReaderStream::new(inp); let oup_stream = stream! { yield Ok(line_prefix_o); for await chunk in inp_stream { @@ -145,30 +143,35 @@ pub fn postproc_prefix(line_prefix: &str, inp: impl AsyncRead + Send) -> impl As StreamReader::new(oup_stream) } -/** - * adds the prefix `Page N:` to each line, - * where N starts at one and is incremented for each ASCII Form Feed character in the input stream. - * (That's the format output by pdftotext) - */ +/// Adds the prefix "Page N:" to each line, +/// where N starts at one and is incremented for each ASCII Form Feed character in the input stream. +/// ASCII form feeds are the page delimiters output by `pdftotext`. pub fn postproc_pagebreaks(line_prefix: &str, inp: impl AsyncRead) -> impl AsyncRead { - let line_prefix = line_prefix.to_string(); // clone since - let mut page_count = 1; + let form_feed = b'\x0c'; + let regex = regex::bytes::Regex::new("\n").unwrap(); + let mut page_count = 0; + let mut line_prefix = format!("\n{}Page {}:", line_prefix, page_count + 1); - panic!("todo!"); - tokio::io::empty() - /*ByteReplacer { - inner: inp, - next_read: format!("{}Page {}:", line_prefix, page_count).into_bytes(), - haystacker: Box::new(|buf| memchr::memchr2(b'\n', b'\x0c', buf)), - replacer: Box::new(move |b| match b { - b'\n' => format!("\n{}Page {}:", line_prefix, page_count).into_bytes(), - b'\x0c' => { - page_count += 1; - format!("\n{}Page {}:", line_prefix, page_count).into_bytes() + let inp_stream = ReaderStream::new(inp); + let oup_stream = stream! { + yield Ok(Bytes::copy_from_slice(line_prefix.as_bytes())); + for await chunk in inp_stream { + match chunk { + Err(e) => yield Err(e), + Ok(chunk) => { + let chunk_iter = chunk.split(|byte| byte == &form_feed); + for sub_chunk in chunk_iter { + if sub_chunk.contains(&b'\n') { + yield Ok(Bytes::copy_from_slice(®ex.replace_all(&sub_chunk, line_prefix.as_bytes()))); + page_count += 1; + line_prefix = format!("\n{}Page {}:", line_prefix, page_count); + } + } + } } - _ => b"[[imposs]]".to_vec(), - }), - }*/ + } + }; + StreamReader::new(oup_stream) } #[cfg(test)]