Implement async postproc_pagebreaks

This commit is contained in:
Joseph LaFreniere 2022-12-22 15:15:33 -06:00
parent 0d75d5bcc2
commit af168efe1a
No known key found for this signature in database
GPG Key ID: EE236AA0141EFCA3

View File

@ -119,14 +119,12 @@ pub fn postproc_encoding(
))*/ ))*/
} }
/** /// Adds the given prefix to each line in an `AsyncRead`.
* adds the given prefix to each line in a AsyncRead
*/
pub fn postproc_prefix(line_prefix: &str, inp: impl AsyncRead + Send) -> impl AsyncRead + Send { pub fn postproc_prefix(line_prefix: &str, inp: impl AsyncRead + Send) -> impl AsyncRead + Send {
let line_prefix_n = format!("\n{}", line_prefix); // clone since we need it later let line_prefix_n = format!("\n{}", line_prefix); // clone since we need it later
let line_prefix_o = Bytes::copy_from_slice(line_prefix.as_bytes()); let line_prefix_o = Bytes::copy_from_slice(line_prefix.as_bytes());
let regex = regex::bytes::Regex::new("\n").unwrap(); let regex = regex::bytes::Regex::new("\n").unwrap();
let mut inp_stream = ReaderStream::new(inp); let inp_stream = ReaderStream::new(inp);
let oup_stream = stream! { let oup_stream = stream! {
yield Ok(line_prefix_o); yield Ok(line_prefix_o);
for await chunk in inp_stream { for await chunk in inp_stream {
@ -145,30 +143,35 @@ pub fn postproc_prefix(line_prefix: &str, inp: impl AsyncRead + Send) -> impl As
StreamReader::new(oup_stream) StreamReader::new(oup_stream)
} }
/** /// Adds the prefix "Page N:" to each line,
* adds the prefix `Page N:` to each line, /// where N starts at one and is incremented for each ASCII Form Feed character in the input stream.
* where N starts at one and is incremented for each ASCII Form Feed character in the input stream. /// ASCII form feeds are the page delimiters output by `pdftotext`.
* (That's the format output by pdftotext)
*/
pub fn postproc_pagebreaks(line_prefix: &str, inp: impl AsyncRead) -> impl AsyncRead { pub fn postproc_pagebreaks(line_prefix: &str, inp: impl AsyncRead) -> impl AsyncRead {
let line_prefix = line_prefix.to_string(); // clone since let form_feed = b'\x0c';
let mut page_count = 1; let regex = regex::bytes::Regex::new("\n").unwrap();
let mut page_count = 0;
let mut line_prefix = format!("\n{}Page {}:", line_prefix, page_count + 1);
panic!("todo!"); let inp_stream = ReaderStream::new(inp);
tokio::io::empty() let oup_stream = stream! {
/*ByteReplacer { yield Ok(Bytes::copy_from_slice(line_prefix.as_bytes()));
inner: inp, for await chunk in inp_stream {
next_read: format!("{}Page {}:", line_prefix, page_count).into_bytes(), match chunk {
haystacker: Box::new(|buf| memchr::memchr2(b'\n', b'\x0c', buf)), Err(e) => yield Err(e),
replacer: Box::new(move |b| match b { Ok(chunk) => {
b'\n' => format!("\n{}Page {}:", line_prefix, page_count).into_bytes(), let chunk_iter = chunk.split(|byte| byte == &form_feed);
b'\x0c' => { for sub_chunk in chunk_iter {
if sub_chunk.contains(&b'\n') {
yield Ok(Bytes::copy_from_slice(&regex.replace_all(&sub_chunk, line_prefix.as_bytes())));
page_count += 1; page_count += 1;
format!("\n{}Page {}:", line_prefix, page_count).into_bytes() line_prefix = format!("\n{}Page {}:", line_prefix, page_count);
} }
_ => b"[[imposs]]".to_vec(), }
}), }
}*/ }
}
};
StreamReader::new(oup_stream)
} }
#[cfg(test)] #[cfg(test)]