fixup! Implement async postproc_pagebreaks

This commit is contained in:
Joseph LaFreniere 2022-12-23 18:35:37 -06:00
parent f19e173585
commit d3803f892c
No known key found for this signature in database
GPG Key ID: EE236AA0141EFCA3

View File

@ -146,40 +146,54 @@ pub fn postproc_prefix(line_prefix: &str, inp: impl AsyncRead + Send) -> impl As
/// Adds the prefix "Page N:" to each line, /// Adds the prefix "Page N:" to each line,
/// where N starts at one and is incremented for each ASCII Form Feed character in the input stream. /// where N starts at one and is incremented for each ASCII Form Feed character in the input stream.
/// ASCII form feeds are the page delimiters output by `pdftotext`. /// ASCII form feeds are the page delimiters output by `pdftotext`.
pub fn postproc_pagebreaks(line_prefix: &str, inp: impl AsyncRead) -> impl AsyncRead { pub fn postproc_pagebreaks(input: impl AsyncRead + Send) -> impl AsyncRead + Send {
let form_feed = b'\x0c'; let regex_linefeed = regex::bytes::Regex::new(r"\x0c").unwrap();
let regex = regex::bytes::Regex::new("\n").unwrap(); let regex_newline = regex::bytes::Regex::new("\n").unwrap();
let mut page_count = 1; let mut page_count: i32 = 1;
let mut line_prefix = format!("\n{}Page {}:", line_prefix, page_count); let mut page_prefix: String = format!("\nPage {}:", page_count);
let inp_stream = ReaderStream::new(inp); let input_stream = ReaderStream::new(input);
let oup_stream = stream! { let output_stream = stream! {
for await chunk in inp_stream { for await chunk in input_stream {
match chunk { match chunk {
Err(e) => yield Err(e), Err(e) => yield Err(e),
Ok(chunk) => { Ok(chunk) => {
let chunk_iter = chunk.split_inclusive(|byte| byte == &form_feed); let sub_chunks = regex_linefeed.split(&chunk);
for sub_chunk in chunk_iter { for sub_chunk in sub_chunks {
if sub_chunk == [form_feed] { // println!("{}", String::from_utf8_lossy(page_prefix.as_bytes()));
yield Ok(Bytes::copy_from_slice(page_prefix.as_bytes()));
yield Ok(Bytes::copy_from_slice(&regex_newline.replace_all(&sub_chunk, page_prefix.as_bytes())));
page_count += 1; page_count += 1;
line_prefix = format!("\n{}Page {}:", line_prefix, page_count); page_prefix = format!("\nPage {}:", page_count);
} else {
yield Ok(Bytes::copy_from_slice(&regex.replace_all(&sub_chunk, line_prefix.as_bytes())));
}
} }
} }
} }
} }
}; };
StreamReader::new(oup_stream) Box::pin(StreamReader::new(output_stream))
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use anyhow::Result; use anyhow::Result;
use std::io::Read; use tokio_test::io::Builder;
use tokio::pin; use tokio_test::io::Mock;
#[tokio::test]
async fn test_with_pagebreaks() {
let mut output: Vec<u8> = Vec::new();
let mock: Mock = Builder::new()
.read(b"Hello\nWorld\x0cFoo Bar\n\x0cTest")
.build();
let res = postproc_pagebreaks(mock).read_to_end(&mut output).await;
println!("{}", String::from_utf8_lossy(&output));
assert!(matches!(res, Ok(_)));
assert_eq!(
output,
b"\nPage 1:Hello\nPage 1:World\nPage 2:Foo Bar\nPage 2:\nPage 3:Test"
);
}
async fn test_from_strs( async fn test_from_strs(
pagebreaks: bool, pagebreaks: bool,