partial recursion + postproc

This commit is contained in:
phiresky 2022-11-13 00:31:25 +01:00
parent 937b1a81ac
commit 54799f1452
5 changed files with 74 additions and 139 deletions

View File

@ -1,7 +1,7 @@
pub mod custom; pub mod custom;
// pub mod decompress; // pub mod decompress;
// pub mod ffmpeg; // pub mod ffmpeg;
// pub mod postproc; pub mod postproc;
// pub mod pdfpages; // pub mod pdfpages;
pub mod spawning; pub mod spawning;
use std::sync::Arc; use std::sync::Arc;

View File

@ -216,6 +216,7 @@ impl CustomAdapterConfig {
mod test { mod test {
use super::super::FileAdapter; use super::super::FileAdapter;
use super::*; use super::*;
use crate::preproc::loop_adapt;
use crate::test_utils::*; use crate::test_utils::*;
use anyhow::Result; use anyhow::Result;
use tokio::fs::File; use tokio::fs::File;
@ -232,7 +233,8 @@ mod test {
let filepath = test_data_dir().join("short.pdf"); let filepath = test_data_dir().join("short.pdf");
let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?)); let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?));
let r = adapter.adapt(a, &d)?; // let r = adapter.adapt(a, &d)?;
let r = loop_adapt(&adapter, d, a)?;
let o = adapted_to_vec(r).await?; let o = adapted_to_vec(r).await?;
assert_eq!( assert_eq!(
String::from_utf8(o)?, String::from_utf8(o)?,

View File

@ -4,111 +4,24 @@
use anyhow::Context; use anyhow::Context;
use anyhow::Result; use anyhow::Result;
use bytes::Bytes;
use encoding_rs_io::DecodeReaderBytesBuilder; use encoding_rs_io::DecodeReaderBytesBuilder;
use tokio::io::AsyncRead; use tokio::io::{AsyncRead, AsyncReadExt};
use async_stream::stream;
use tokio_util::io::ReaderStream;
use tokio_util::io::StreamReader;
use std::io::Cursor;
use std::pin::Pin;
use std::{ use std::{
cmp::min, cmp::min,
}; };
use crate::adapted_iter::{AdaptedFilesIterBox, SingleAdaptedFileAsIter}; use crate::adapted_iter::{AdaptedFilesIterBox};
use super::{AdaptInfo, AdapterMeta, FileAdapter, GetMetadata}; use super::{AdaptInfo, AdapterMeta, FileAdapter, GetMetadata};
/** pass through, except adding \n at the end */ fn add_newline(ar: impl AsyncRead + Send) -> impl AsyncRead + Send {
pub struct EnsureEndsWithNewline<R: AsyncRead> { ar.chain(Cursor::new(&[b'\n']))
inner: R,
added_newline: bool,
}
impl<R: AsyncRead> EnsureEndsWithNewline<R> {
pub fn new(r: R) -> EnsureEndsWithNewline<R> {
EnsureEndsWithNewline {
inner: r,
added_newline: false,
}
}
}
impl<R: AsyncRead> AsyncRead for EnsureEndsWithNewline<R> {
fn poll_read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match self.inner.read(buf) {
Ok(0) => {
if self.added_newline {
Ok(0)
} else {
buf[0] = b'\n';
self.added_newline = true;
Ok(1)
}
}
Ok(n) => Ok(n),
Err(e) => Err(e),
}
}
}
struct ByteReplacer<R>
where
R:AsyncRead,
{
inner: R,
next_read: Vec<u8>,
replacer: Box<dyn FnMut(u8) -> Vec<u8>>,
haystacker: Box<dyn Fn(&[u8]) -> Option<usize>>,
}
impl<R> ByteReplacer<R>
where
R: AsyncRead,
{
fn output_next(&mut self, buf: &mut [u8], buf_valid_until: usize, replacement: &[u8]) -> usize {
let after_part1 = Vec::from(&buf[1..buf_valid_until]);
/*let mut after_part = Vec::with_capacity(replacement.len() + replaced_len);
after_part.extend_from_slice(replacement);
after_part.extend_from_slice(&buf[..replaced_len]);*/
let writeable_count = min(buf.len(), replacement.len());
buf[..writeable_count].copy_from_slice(&replacement[0..writeable_count]);
let after_rep = &replacement[writeable_count..];
let mut ov = Vec::new();
ov.extend_from_slice(&after_rep);
ov.extend_from_slice(&after_part1);
ov.extend_from_slice(&self.next_read);
self.next_read = ov;
return writeable_count;
}
}
impl<R> AsyncRead for ByteReplacer<R>
where
R: AsyncRead,
{
fn poll_read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let read = if self.next_read.len() > 0 {
let count = std::cmp::min(self.next_read.len(), buf.len());
buf[0..count].copy_from_slice(&self.next_read[0..count]);
self.next_read.drain(0..count).count();
Ok(count)
} else {
self.inner.read(buf)
};
match read {
Ok(u) => {
match (self.haystacker)(&buf[0..u]) {
Some(i) => {
let data = (self.replacer)(buf[i]);
Ok(i + self.output_next(&mut buf[i..], u - i, &data))
}
None => Ok(u),
}
// todo: use memchr2?
}
Err(e) => Err(e),
}
}
} }
pub struct PostprocPrefix {} pub struct PostprocPrefix {}
@ -132,20 +45,20 @@ impl GetMetadata for PostprocPrefix {
impl FileAdapter for PostprocPrefix { impl FileAdapter for PostprocPrefix {
fn adapt<'a>( fn adapt<'a>(
&self, &self,
a: super::AdaptInfo<'a>, a: super::AdaptInfo,
_detection_reason: &crate::matching::FileMatcher, _detection_reason: &crate::matching::FileMatcher,
) -> Result<AdaptedFilesIterBox<'a>> { ) -> Result<AdaptedFilesIterBox> {
let read = EnsureEndsWithNewline::new(postproc_prefix( let read = add_newline(postproc_prefix(
&a.line_prefix, &a.line_prefix,
postproc_encoding(&a.line_prefix, a.inp)?, postproc_encoding(&a.line_prefix, a.inp)?,
)); ));
// keep adapt info (filename etc) except replace inp // keep adapt info (filename etc) except replace inp
let ai = AdaptInfo { let ai = AdaptInfo {
inp: Box::new(read), inp: Box::pin(read),
postprocess: false, postprocess: false,
..a ..a
}; };
Ok(Box::new(SingleAdaptedFileAsIter::new(ai))) Ok(Box::pin(tokio_stream::once(ai)))
} }
} }
@ -158,11 +71,13 @@ impl Read for ReadErr {
} }
}*/ }*/
pub fn postproc_encoding<'a, R: AsyncRead + 'a>( pub fn postproc_encoding(
line_prefix: &str, line_prefix: &str,
inp: R, inp: impl AsyncRead + Send + 'static,
) -> Result<Box<dyn AsyncRead + 'a>> { ) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
// TODO: parse these options from ripgrep's configuration Ok(Box::pin(inp))
// panic!("todo: implement");
/*// TODO: parse these options from ripgrep's configuration
let encoding = None; // detect bom but usually assume utf8 let encoding = None; // detect bom but usually assume utf8
let bom_sniffing = true; let bom_sniffing = true;
let mut decode_builder = DecodeReaderBytesBuilder::new(); let mut decode_builder = DecodeReaderBytesBuilder::new();
@ -199,24 +114,39 @@ pub fn postproc_encoding<'a, R: AsyncRead + 'a>(
} }
Ok(Box::new( Ok(Box::new(
std::io::Cursor::new(fourk).chain(beginning.into_inner()), std::io::Cursor::new(fourk).chain(beginning.into_inner()),
)) ))*/
} }
pub fn postproc_prefix(line_prefix: &str, inp: impl AsyncRead) -> impl AsyncRead { pub fn postproc_prefix(line_prefix: &str, inp: impl AsyncRead + Send) -> impl AsyncRead + Send {
let line_prefix = line_prefix.to_string(); // clone since we need it later let line_prefix_n = format!("\n{}", line_prefix); // clone since we need it later
ByteReplacer { let line_prefix_o = Bytes::copy_from_slice(line_prefix.as_bytes());
inner: inp, let regex = regex::bytes::Regex::new("\n").unwrap();
next_read: format!("{}", line_prefix).into_bytes(), let mut inp_stream = ReaderStream::new(inp);
haystacker: Box::new(|buf| memchr::memchr(b'\n', buf)), let oup_stream = stream! {
replacer: Box::new(move |_| format!("\n{}", line_prefix).into_bytes()), yield Ok(line_prefix_o);
for await chunk in inp_stream {
match chunk {
Err(e) => yield Err(e),
Ok(chunk) => {
if chunk.contains(&b'\n') {
yield Ok(Bytes::copy_from_slice(&regex.replace_all(&chunk, line_prefix_n.as_bytes())));
} else {
yield Ok(chunk);
} }
}
}
}
};
StreamReader::new(oup_stream)
} }
pub fn postproc_pagebreaks(line_prefix: &str, inp: impl AsyncRead) -> impl AsyncRead { pub fn postproc_pagebreaks(line_prefix: &str, inp: impl AsyncRead) -> impl AsyncRead {
let line_prefix = line_prefix.to_string(); // clone since let line_prefix = line_prefix.to_string(); // clone since
let mut page_count = 1; let mut page_count = 1;
ByteReplacer { panic!("todo!");
tokio::io::empty()
/*ByteReplacer {
inner: inp, inner: inp,
next_read: format!("{}Page {}:", line_prefix, page_count).into_bytes(), next_read: format!("{}Page {}:", line_prefix, page_count).into_bytes(),
haystacker: Box::new(|buf| memchr::memchr2(b'\n', b'\x0c', buf)), haystacker: Box::new(|buf| memchr::memchr2(b'\n', b'\x0c', buf)),
@ -228,26 +158,29 @@ pub fn postproc_pagebreaks(line_prefix: &str, inp: impl AsyncRead) -> impl Async
} }
_ => b"[[imposs]]".to_vec(), _ => b"[[imposs]]".to_vec(),
}), }),
} }*/
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use anyhow::Result; use anyhow::Result;
use tokio::pin;
use std::io::Read; use std::io::Read;
fn test_from_strs(pagebreaks: bool, line_prefix: &str, a: &str, b: &str) -> Result<()> { async fn test_from_strs(pagebreaks: bool, line_prefix: &str, a: &'static str, b: &str) -> Result<()> {
test_from_bytes(pagebreaks, line_prefix, a.as_bytes(), b) test_from_bytes(pagebreaks, line_prefix, a.as_bytes(), b).await
} }
fn test_from_bytes(pagebreaks: bool, line_prefix: &str, a: &[u8], b: &str) -> Result<()> { async fn test_from_bytes(pagebreaks: bool, line_prefix: &str, a: &'static [u8], b: &str) -> Result<()> {
let mut oup = Vec::new(); let mut oup = Vec::new();
let inp = postproc_encoding("", a)?; let inp = postproc_encoding("", a)?;
if pagebreaks { if pagebreaks {
postproc_pagebreaks(line_prefix, inp).read_to_end(&mut oup)?; postproc_pagebreaks(line_prefix, inp).read_to_end(&mut oup).await?;
} else { } else {
postproc_prefix(line_prefix, inp).read_to_end(&mut oup)?; let x = postproc_prefix(line_prefix, inp);
pin!(x);
x.read_to_end(&mut oup).await?;
} }
let c = String::from_utf8_lossy(&oup); let c = String::from_utf8_lossy(&oup);
if b != c { if b != c {
@ -262,32 +195,32 @@ mod tests {
Ok(()) Ok(())
} }
#[test] #[tokio::test]
fn post1() -> Result<()> { async fn post1() -> Result<()> {
let inp = "What is this\nThis is a test\nFoo"; let inp = "What is this\nThis is a test\nFoo";
let oup = "Page 1:What is this\nPage 1:This is a test\nPage 1:Foo"; let oup = "Page 1:What is this\nPage 1:This is a test\nPage 1:Foo";
test_from_strs(true, "", inp, oup)?; test_from_strs(true, "", inp, oup).await?;
println!("\n\n\n\n"); println!("\n\n\n\n");
let inp = "What is this\nThis is a test\nFoo\x0c\nHelloooo\nHow are you?\x0c\nGreat!"; let inp = "What is this\nThis is a test\nFoo\x0c\nHelloooo\nHow are you?\x0c\nGreat!";
let oup = "Page 1:What is this\nPage 1:This is a test\nPage 1:Foo\nPage 2:\nPage 2:Helloooo\nPage 2:How are you?\nPage 3:\nPage 3:Great!"; let oup = "Page 1:What is this\nPage 1:This is a test\nPage 1:Foo\nPage 2:\nPage 2:Helloooo\nPage 2:How are you?\nPage 3:\nPage 3:Great!";
test_from_strs(true, "", inp, oup)?; test_from_strs(true, "", inp, oup).await?;
let inp = "What is this\nThis is a test\nFoo\x0c\nHelloooo\nHow are you?\x0c\nGreat!"; let inp = "What is this\nThis is a test\nFoo\x0c\nHelloooo\nHow are you?\x0c\nGreat!";
let oup = "foo.pdf:What is this\nfoo.pdf:This is a test\nfoo.pdf:Foo\x0c\nfoo.pdf:Helloooo\nfoo.pdf:How are you?\x0c\nfoo.pdf:Great!"; let oup = "foo.pdf:What is this\nfoo.pdf:This is a test\nfoo.pdf:Foo\x0c\nfoo.pdf:Helloooo\nfoo.pdf:How are you?\x0c\nfoo.pdf:Great!";
test_from_strs(false, "foo.pdf:", inp, oup)?; test_from_strs(false, "foo.pdf:", inp, oup).await?;
test_from_strs( test_from_strs(
false, false,
"foo:", "foo:",
"this is a test \n\n \0 foo", "this is a test \n\n \0 foo",
"foo:[rga: binary data]", "foo:[rga: binary data]",
)?; ).await?;
test_from_strs(false, "foo:", "\0", "foo:[rga: binary data]")?; test_from_strs(false, "foo:", "\0", "foo:[rga: binary data]").await?;
Ok(()) Ok(())
} }

View File

@ -15,7 +15,7 @@ fn list_adapters(args: RgaConfig) -> Result<()> {
let (enabled_adapters, disabled_adapters) = get_all_adapters(args.custom_adapters.clone()); let (enabled_adapters, disabled_adapters) = get_all_adapters(args.custom_adapters.clone());
println!("Adapters:\n"); println!("Adapters:\n");
let print = |adapter: std::rc::Arc<dyn FileAdapter>| { let print = |adapter: std::sync::Arc<dyn FileAdapter>| {
let meta = adapter.metadata(); let meta = adapter.metadata();
let matchers = meta let matchers = meta
.fast_matchers .fast_matchers

View File

@ -65,6 +65,10 @@ async fn buf_choose_adapter(ai: AdaptInfo) -> Result<Ret> {
&mut inp, &mut inp,
) )
.await?; .await?;
let ai = AdaptInfo {
inp: Box::pin(inp),
..ai
};
let (a, b, c) = match adapter { let (a, b, c) = match adapter {
Some(x) => x, Some(x) => x,
None => { None => {
@ -91,10 +95,6 @@ async fn buf_choose_adapter(ai: AdaptInfo) -> Result<Ret> {
} }
} }
}; };
let ai = AdaptInfo {
inp: Box::pin(inp),
..ai
};
Ok(Ret::Recurse(ai, a, b, c)) Ok(Ret::Recurse(ai, a, b, c))
} }
@ -215,7 +215,7 @@ async fn adapt_caching(
} }
} }
fn loop_adapt( pub fn loop_adapt(
adapter: &dyn FileAdapter, adapter: &dyn FileAdapter,
detection_reason: FileMatcher, detection_reason: FileMatcher,
ai: AdaptInfo, ai: AdaptInfo,
@ -233,8 +233,8 @@ fn loop_adapt(
for await file in inp { for await file in inp {
match buf_choose_adapter(file).await.expect("todo: handle") { match buf_choose_adapter(file).await.expect("todo: handle") {
Ret::Recurse(ai, adapter, detection_reason, active_adapters) => { Ret::Recurse(ai, adapter, detection_reason, active_adapters) => {
for await file in loop_adapt(adapter.as_ref(), detection_reason, file).expect("todo: handle") { for await ifile in loop_adapt(adapter.as_ref(), detection_reason, ai).expect("todo: handle") {
yield file; yield ifile;
} }
} }
Ret::Passthrough(ai) => { Ret::Passthrough(ai) => {