fixes to postprocessing

This commit is contained in:
phiresky 2021-08-26 14:54:42 +02:00
parent 38e947fb12
commit 020000cc77
8 changed files with 129 additions and 30 deletions

View File

@ -89,7 +89,7 @@ lazy_static! {
"--from={file_extension}",
"--to=plain",
"--wrap=none",
"--atx-headers"
"--markdown-headings=atx"
]),
disabled_by_default: None,
match_only_by_mime: None

View File

@ -2,14 +2,20 @@
//impl<T> FileAdapter for T where T: RunFnAdapter {}
use anyhow::Context;
use anyhow::Result;
use encoding_rs_io::DecodeReaderBytesBuilder;
use std::{cmp::min, io::Read};
use std::{
cmp::min,
io::{BufRead, BufReader, Read},
};
use crate::adapted_iter::{AdaptedFilesIterBox, SingleAdaptedFileAsIter};
use super::{AdaptInfo, AdapterMeta, FileAdapter, GetMetadata};
/** pass through, except adding \n at the end */
pub struct EnsureEndsWithNewline<R: Read> {
inner: R,
added_newline: bool,
@ -112,7 +118,7 @@ impl GetMetadata for PostprocPrefix {
static ref METADATA: AdapterMeta = AdapterMeta {
name: "postprocprefix".to_owned(),
version: 1,
description: "Adds the line prefix to each line".to_owned(),
description: "Adds the line prefix to each line (e.g. the filename within a zip)".to_owned(),
recurses: true,
fast_matchers: vec![],
slow_matchers: None,
@ -129,7 +135,10 @@ impl FileAdapter for PostprocPrefix {
a: super::AdaptInfo<'a>,
_detection_reason: &crate::matching::FileMatcher,
) -> Result<AdaptedFilesIterBox<'a>> {
let read = EnsureEndsWithNewline::new(postproc_prefix(&a.line_prefix, a.inp)?);
let read = EnsureEndsWithNewline::new(postproc_prefix(
&a.line_prefix,
postproc_encoding(&a.line_prefix, a.inp)?,
));
// keep adapt info (filename etc) except replace inp
let ai = AdaptInfo {
inp: Box::new(read),
@ -140,21 +149,73 @@ impl FileAdapter for PostprocPrefix {
}
}
pub fn postproc_prefix(line_prefix: &str, inp: impl Read) -> Result<impl Read> {
/*struct ReadErr {
err: Fn() -> std::io::Error,
}
impl Read for ReadErr {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
Err(self.err())
}
}*/
pub fn postproc_encoding<'a, R: Read+'a>(line_prefix: &str, inp: R) -> Result<Box<dyn Read + 'a>> {
// TODO: parse these options from ripgrep's configuration
let encoding = None; // detect bom but usually assume utf8
let bom_sniffing = true;
let mut decode_builder = DecodeReaderBytesBuilder::new();
// https://github.com/BurntSushi/ripgrep/blob/a7d26c8f144a4957b75f71087a66692d0b25759a/grep-searcher/src/searcher/mod.rs#L706
// this detects utf-16 BOMs and transcodes to utf-8 if they are present
// it does not detect any other char encodings. that would require https://github.com/hsivonen/chardetng or similar but then binary detection is hard (?)
let inp = decode_builder
.encoding(encoding)
.utf8_passthru(true)
.strip_bom(bom_sniffing)
.bom_override(true)
.bom_sniffing(bom_sniffing)
.build(inp);
// check for binary content in first 8kB
// read the first 8kB into a buffer, check for null bytes, then return the buffer concatenated with the rest of the file
let mut fourk = Vec::with_capacity(1 << 13);
let mut beginning = inp.take(1 << 13);
beginning.read_to_end(&mut fourk)?;
if fourk.contains(&0u8) {
log::debug!("detected binary");
let v = "[rga: binary data]";
return Ok(Box::new(std::io::Cursor::new(
v
)));
/*let err = std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("{}[rga: binary data]", line_prefix),
);
return Err(err).context("");
return ReadErr {
err,
};*/
}
Ok(Box::new(
std::io::Cursor::new(fourk).chain(beginning.into_inner()),
))
}
pub fn postproc_prefix(line_prefix: &str, inp: impl Read) -> impl Read {
let line_prefix = line_prefix.to_string(); // clone since we need it later
Ok(ByteReplacer {
ByteReplacer {
inner: inp,
next_read: format!("{}", line_prefix).into_bytes(),
haystacker: Box::new(|buf| memchr::memchr(b'\n', buf)),
replacer: Box::new(move |_| format!("\n{}", line_prefix).into_bytes()),
})
}
}
pub fn postproc_pagebreaks(line_prefix: &str, inp: impl Read) -> Result<impl Read> {
pub fn postproc_pagebreaks(line_prefix: &str, inp: impl Read) -> impl Read {
let line_prefix = line_prefix.to_string(); // clone since
let mut page_count = 1;
Ok(ByteReplacer {
ByteReplacer {
inner: inp,
next_read: format!("{}Page {}:", line_prefix, page_count).into_bytes(),
haystacker: Box::new(|buf| memchr::memchr2(b'\n', b'\x0c', buf)),
@ -166,21 +227,35 @@ pub fn postproc_pagebreaks(line_prefix: &str, inp: impl Read) -> Result<impl Rea
}
_ => b"[[imposs]]".to_vec(),
}),
})
}
}
#[cfg(test)]
mod tests {
use super::postproc_pagebreaks;
use super::*;
use anyhow::Result;
use std::io::Read;
fn test_from_strs(a: &str, b: &str) -> Result<()> {
fn test_from_strs(pagebreaks: bool, line_prefix: &str, a: &str, b: &str) -> Result<()> {
test_from_bytes(pagebreaks, line_prefix, a.as_bytes(), b)
}
fn test_from_bytes(pagebreaks: bool, line_prefix: &str, a: &[u8], b: &str) -> Result<()> {
let mut oup = Vec::new();
postproc_pagebreaks("", a.as_bytes())?.read_to_end(&mut oup)?;
let inp = postproc_encoding("", a)?;
if pagebreaks {
postproc_pagebreaks(line_prefix, inp).read_to_end(&mut oup)?;
} else {
postproc_prefix(line_prefix, inp).read_to_end(&mut oup)?;
}
let c = String::from_utf8_lossy(&oup);
if b != c {
anyhow::bail!("{}\nshould be\n{}\nbut is\n{}", a, b, c);
anyhow::bail!(
"`{}`\nshould be\n`{}`\nbut is\n`{}`",
String::from_utf8_lossy(&a),
b,
c
);
}
Ok(())
@ -191,15 +266,33 @@ mod tests {
let inp = "What is this\nThis is a test\nFoo";
let oup = "Page 1:What is this\nPage 1:This is a test\nPage 1:Foo";
test_from_strs(inp, oup)?;
test_from_strs(true, "", inp, oup)?;
println!("\n\n\n\n");
let inp = "What is this\nThis is a test\nFoo\x0c\nHelloooo\nHow are you?\x0c\nGreat!";
let oup = "Page 1:What is this\nPage 1:This is a test\nPage 1:Foo\nPage 2:\nPage 2:Helloooo\nPage 2:How are you?\nPage 3:\nPage 3:Great!";
test_from_strs(inp, oup)?;
test_from_strs(true, "", inp, oup)?;
let inp = "What is this\nThis is a test\nFoo\x0c\nHelloooo\nHow are you?\x0c\nGreat!";
let oup = "foo.pdf:What is this\nfoo.pdf:This is a test\nfoo.pdf:Foo\x0c\nfoo.pdf:Helloooo\nfoo.pdf:How are you?\x0c\nfoo.pdf:Great!";
test_from_strs(false, "foo.pdf:", inp, oup)?;
test_from_strs(false, "foo:", "this is a test \n\n \0 foo", "foo:[rga: binary data]")?;
test_from_strs(false, "foo:", "\0", "foo:[rga: binary data]")?;
Ok(())
}
/*#[test]
fn chardet() -> Result<()> {
let mut d = chardetng::EncodingDetector::new();
let mut v = Vec::new();
std::fs::File::open("/home/phire/passwords-2018.kdbx.old").unwrap().read_to_end(&mut v).unwrap();
d.feed(&v, false);
println!("foo {:?}", d.guess(None, true));
Ok(())
}*/
}

View File

@ -61,10 +61,6 @@ pub fn postproc_line_prefix(
pub trait SpawningFileAdapterTrait: GetMetadata {
fn get_exe(&self) -> &str;
fn command(&self, filepath_hint: &Path, command: Command) -> Result<Command>;
/*fn postproc(&self, line_prefix: &str, inp: &mut dyn Read, oup: &mut dyn Write) -> Result<()> {
postproc_line_prefix(line_prefix, inp, oup)
}*/
}
pub struct SpawningFileAdapter {

View File

@ -52,7 +52,7 @@ impl<'a> AdaptedFilesIter for ZipAdaptIter<'a> {
fn next<'b>(&'b mut self) -> Option<AdaptInfo<'b>> {
let line_prefix = &self.inp.line_prefix;
let filepath_hint = &self.inp.filepath_hint;
let archive_recursion_depth = 1;
let archive_recursion_depth = &self.inp.archive_recursion_depth;
let postprocess = self.inp.postprocess;
::zip::read::read_zipfile_from_stream(&mut self.inp.inp)
.unwrap()

View File

@ -33,7 +33,15 @@ fn main() -> anyhow::Result<()> {
let start = Instant::now();
let mut oup = rga_preproc(ai).context("during preprocessing")?;
debug!("finding and starting adapter took {}", print_dur(start));
std::io::copy(&mut oup, &mut o).context("copying adapter output to stdout")?;
let res = std::io::copy(&mut oup, &mut o);
if let Err(e) = res {
if e.kind() == std::io::ErrorKind::BrokenPipe {
// happens if e.g. ripgrep detects binary data in the pipe so it cancels reading
debug!("output cancelled (broken pipe)");
} else {
Err(e).context("copying adapter output to stdout {}")?;
}
}
debug!("running adapter took {} total", print_dur(start));
Ok(())
}

View File

@ -9,7 +9,7 @@ use std::io::{Read, Write};
*/
pub struct CachingReader<R: Read> {
max_cache_size: usize,
zstd_writer: Option<zstd::stream::write::Encoder<Vec<u8>>>,
zstd_writer: Option<zstd::stream::write::Encoder<'static, Vec<u8>>>,
inp: R,
bytes_written: u64,
on_finish: Box<dyn FnOnce((u64, Option<Vec<u8>>)) -> Result<()> + Send>,

View File

@ -2,7 +2,6 @@
// extended to support sending io errors
#![deny(missing_docs)]
#![doc(html_root_url = "https://docs.rs/pipe/0.3.0")]
#![cfg_attr(feature = "unstable-doc-cfg", feature(doc_cfg))]
//! Synchronous in-memory pipe

View File

@ -1,6 +1,7 @@
use crate::{config::CacheConfig, print_bytes, print_dur};
use anyhow::{format_err, Context, Result};
use log::*;
use rkv::backend::{BackendEnvironmentBuilder, LmdbEnvironment};
use std::{fmt::Display, path::Path, time::Instant};
pub trait PreprocCache: Send + Sync {
@ -20,16 +21,18 @@ pub trait PreprocCache: Send + Sync {
}
/// opens a LMDB cache
fn open_cache_db(path: &Path) -> Result<std::sync::Arc<std::sync::RwLock<rkv::Rkv>>> {
fn open_cache_db(path: &Path) -> Result<std::sync::Arc<std::sync::RwLock<rkv::Rkv<LmdbEnvironment>>>> {
std::fs::create_dir_all(path)?;
use rkv::backend::LmdbEnvironmentFlags;
rkv::Manager::singleton()
rkv::Manager::<LmdbEnvironment>::singleton()
.write()
.map_err(|_| format_err!("could not write cache db manager"))?
.get_or_create(path, |p| {
let mut builder = rkv::Rkv::environment_builder();
let mut builder = rkv::Rkv::environment_builder::<rkv::backend::Lmdb>();
builder
.set_flags(rkv::EnvironmentFlags::NO_SYNC | rkv::EnvironmentFlags::WRITE_MAP) // not durable cuz it's a cache
.set_flags(rkv::EnvironmentFlags::NO_SYNC)
.set_flags(rkv::EnvironmentFlags::WRITE_MAP) // not durable cuz it's a cache
// i'm not sure why NO_TLS is needed. otherwise LMDB transactions (open readers) will keep piling up until it fails with
// LmdbError(ReadersFull). Those "open readers" stay even after the corresponding processes exit.
// hope setting this doesn't break integrity
@ -38,13 +41,13 @@ fn open_cache_db(path: &Path) -> Result<std::sync::Arc<std::sync::RwLock<rkv::Rk
.set_map_size(2 * 1024 * 1024 * 1024)
.set_max_dbs(100)
.set_max_readers(128);
rkv::Rkv::from_env(p, builder)
rkv::Rkv::from_builder(p, builder)
})
.map_err(|e| format_err!("could not get/create cache db: {}", e))
}
pub struct LmdbCache {
db_arc: std::sync::Arc<std::sync::RwLock<rkv::Rkv>>,
db_arc: std::sync::Arc<std::sync::RwLock<rkv::Rkv<LmdbEnvironment>>>,
}
impl LmdbCache {