From b29e6dec0e8399c511c2cb66a784b5b0bf335361 Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 26 Dec 2022 21:10:42 +0100 Subject: [PATCH] handle io error, add example config --- Cargo.lock | 7 ++ Cargo.toml | 1 + doc/Custom adapters.md | 9 -- doc/config.default.jsonc | 18 ++++ src/adapters/custom.rs | 24 ++++- src/adapters/postproc.rs | 2 +- src/caching_writer.rs | 4 +- src/config.rs | 34 +++---- src/lib.rs | 5 +- src/pipe.rs | 195 --------------------------------------- src/preproc.rs | 14 ++- 11 files changed, 80 insertions(+), 233 deletions(-) delete mode 100644 doc/Custom adapters.md create mode 100644 doc/config.default.jsonc delete mode 100644 src/pipe.rs diff --git a/Cargo.lock b/Cargo.lock index 54fe77b..cb6def3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -832,6 +832,12 @@ dependencies = [ "libc", ] +[[package]] +name = "json_comments" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41ee439ee368ba4a77ac70d04f14015415af8600d6c894dc1f11bd79758c57d5" + [[package]] name = "lazy_static" version = "1.4.0" @@ -1311,6 +1317,7 @@ dependencies = [ "encoding_rs_io", "env_logger", "glob", + "json_comments", "lazy_static", "log", "memchr", diff --git a/Cargo.toml b/Cargo.toml index 2b6c1db..c1f4ad0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ encoding_rs = "0.8.24" encoding_rs_io = "0.1.7" env_logger = "0.9.0" glob = "0.3.0" +json_comments = "0.2.1" lazy_static = "1.4.0" log = "0.4.11" memchr = "2.3.3" diff --git a/doc/Custom adapters.md b/doc/Custom adapters.md deleted file mode 100644 index cba98b7..0000000 --- a/doc/Custom adapters.md +++ /dev/null @@ -1,9 +0,0 @@ -## Custom adapters - -Since [version], you can specify custom adapters that invoke external preprocessing scripts in the config file. - -[Todo: how] - -[Todo: why not just use --pre yourself (copy/see https://github.com/phiresky/ripgrep-all/issues/60)] - -If you think your adapter config is useful, you can share it by opening an issue with the title: "Custom adapter: xyz" diff --git a/doc/config.default.jsonc b/doc/config.default.jsonc new file mode 100644 index 0000000..28cd9a3 --- /dev/null +++ b/doc/config.default.jsonc @@ -0,0 +1,18 @@ +{ + // This file follows the JSON schema defined below. + // If you use an editor that supports JSON schema (e.g. VS Code), + // you should be getting IntelliSense and validation. + "$schema": "./config.v1.schema.json", + // The default config and schema will be regenerated if they are missing + // https://github.com/phiresky/ripgrep-all/blob/master/doc/config.default.jsonc + + // The config options are the same as the command line options, + // but with --rga- prefix removed and - replaced with _. + // e.g. --rga-no-cache becomes `"no_cache": true. + // The only exception is the `custom_adapters` option, which can only be set in this file. + + "custom_adapters": [ + // See https://github.com/phiresky/ripgrep-all/wiki for more information + // to verify if your custom adapters are picked up correctly, run `rga --rga-list-adapters` + ] +} diff --git a/src/adapters/custom.rs b/src/adapters/custom.rs index 22c4fa4..1bc5253 100644 --- a/src/adapters/custom.rs +++ b/src/adapters/custom.rs @@ -1,6 +1,7 @@ use super::*; use super::{AdaptInfo, AdapterMeta, FileAdapter, GetMetadata}; use crate::adapted_iter::one_file; + use crate::{ adapted_iter::AdaptedFilesIterBox, expand::expand_str_ez, @@ -11,7 +12,6 @@ use async_stream::stream; use bytes::Bytes; use lazy_static::lazy_static; use log::debug; - use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::path::Path; @@ -19,8 +19,9 @@ use std::process::Stdio; use tokio::io::AsyncReadExt; use tokio::process::Child; use tokio::process::Command; -use tokio_util::io::StreamReader; +use tokio::task::JoinHandle; +use tokio_util::io::StreamReader; // mostly the same as AdapterMeta + SpawningFileAdapter #[derive(Debug, Deserialize, Serialize, JsonSchema, Default, PartialEq, Clone)] pub struct CustomAdapterConfig { @@ -155,6 +156,15 @@ fn proc_wait(mut child: Child) -> impl AsyncRead { }; StreamReader::new(s) } +/** returns an AsyncRead that is empty but returns an io error if the given task had an io error or join error */ +fn join_handle_to_stream(join: JoinHandle>) -> impl AsyncRead { + let st = stream! { + join.await.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))??; + yield std::io::Result::Ok(Bytes::copy_from_slice(b"")) + }; + + StreamReader::new(st) +} pub fn pipe_output( _line_prefix: &str, mut cmd: Command, @@ -170,11 +180,15 @@ pub fn pipe_output( let mut stdi = cmd.stdin.take().expect("is piped"); let stdo = cmd.stdout.take().expect("is piped"); - tokio::spawn(async move { + let join = tokio::spawn(async move { let mut z = inp; - tokio::io::copy(&mut z, &mut stdi).await.unwrap(); + tokio::io::copy(&mut z, &mut stdi).await?; + std::io::Result::Ok(()) }); - Ok(Box::pin(stdo.chain(proc_wait(cmd)))) + + Ok(Box::pin( + stdo.chain(proc_wait(cmd).chain(join_handle_to_stream(join))), + )) } pub struct CustomSpawningFileAdapter { diff --git a/src/adapters/postproc.rs b/src/adapters/postproc.rs index 40ff205..c178ea0 100644 --- a/src/adapters/postproc.rs +++ b/src/adapters/postproc.rs @@ -153,7 +153,7 @@ impl GetMetadata for PostprocPageBreaks { static ref METADATA: AdapterMeta = AdapterMeta { name: "postprocpagebreaks".to_owned(), version: 1, - description: "Adds the page number to each line for an input file that specifies page breaks as ascii page break character".to_owned(), + description: "Adds the page number to each line for an input file that specifies page breaks as ascii page break character.\nMainly to be used internally by the poppler adapter.".to_owned(), recurses: false, fast_matchers: vec![FastFileMatcher::FileExtension("asciipagebreaks".to_string())], slow_matchers: None, diff --git a/src/caching_writer.rs b/src/caching_writer.rs index 0bf3ab1..048e774 100644 --- a/src/caching_writer.rs +++ b/src/caching_writer.rs @@ -9,6 +9,8 @@ use tokio::io::{AsyncRead, AsyncWriteExt}; use tokio_stream::StreamExt; use tokio_util::io::{ReaderStream, StreamReader}; +use crate::to_io_err; + type FinishHandler = dyn FnOnce((u64, Option>)) -> Result<()> + Send; /** * wrap a AsyncRead so that it is passthrough, @@ -63,7 +65,7 @@ pub fn async_read_and_write_to_cache<'a>( // EOF, finish! on_finish(finish) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + .map_err(to_io_err)?; }; diff --git a/src/config.rs b/src/config.rs index b350bff..df6979f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -5,6 +5,7 @@ use log::*; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::ffi::OsString; +use std::io::Read; use std::{fs::File, io::Write, iter::IntoIterator, path::PathBuf, str::FromStr}; use structopt::StructOpt; @@ -267,10 +268,18 @@ fn read_config_file(path_override: Option) -> Result<(String, Value)> { .unwrap_or_else(|| config_dir.join("config.jsonc")); let config_filename_str = config_filename.to_string_lossy().into_owned(); if config_filename.exists() { - let config_file_contents = std::fs::read_to_string(config_filename) - .with_context(|| format!("Could not read config file json {config_filename_str}"))?; + let config_file_contents = { + let raw = std::fs::read_to_string(config_filename).with_context(|| { + format!("Could not read config file json {config_filename_str}") + })?; + let mut s = String::new(); + json_comments::StripComments::new(raw.as_bytes()) + .read_to_string(&mut s) + .context("strip comments")?; + s + }; { - // just for error messages + // just for error messages, actual deserialization happens after merging with cmd args serde_json::from_str::(&config_file_contents).with_context(|| { format!("Error in config file {config_filename_str}: {config_file_contents}") })?; @@ -283,25 +292,18 @@ fn read_config_file(path_override: Option) -> Result<(String, Value)> { } else { // write default config std::fs::create_dir_all(config_dir)?; - let mut schemafile = File::create(config_dir.join("config.schema.json"))?; + let mut schemafile = File::create(config_dir.join("config.v1.schema.json"))?; schemafile.write_all( serde_json::to_string_pretty(&schemars::schema_for!(RgaConfig))?.as_bytes(), )?; - let mut config_json = serde_json::to_value(RgaConfig::default())?; - match &mut config_json { - serde_json::Value::Object(o) => { - o.insert( - "$schema".to_string(), - serde_json::Value::String("./config.schema.json".to_string()), - ); - } - _ => panic!("impos"), - } let mut configfile = File::create(config_filename)?; - configfile.write_all(serde_json::to_string_pretty(&config_json)?.as_bytes())?; - Ok((config_filename_str, config_json)) + configfile.write_all(include_str!("../doc/config.default.jsonc").as_bytes())?; + Ok(( + config_filename_str, + serde_json::Value::Object(Default::default()), + )) } } fn read_config_env() -> Result { diff --git a/src/lib.rs b/src/lib.rs index 760dd8b..3fd4675 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,7 +6,6 @@ mod caching_writer; pub mod config; pub mod expand; pub mod matching; -pub mod pipe; pub mod preproc; pub mod preproc_cache; pub mod recurse; @@ -64,6 +63,10 @@ pub fn print_bytes(bytes: impl Into) -> String { pretty_bytes::converter::convert(bytes.into()) } +pub fn to_io_err(e: anyhow::Error) -> std::io::Error { + std::io::Error::new(std::io::ErrorKind::Other, e) +} + #[cfg(test)] #[ctor::ctor] fn init() { diff --git a/src/pipe.rs b/src/pipe.rs deleted file mode 100644 index 8317fed..0000000 --- a/src/pipe.rs +++ /dev/null @@ -1,195 +0,0 @@ -// https://github.com/arcnmx/pipe-rs/blob/master/src/lib.rs -// extended to support sending io errors - -#![deny(missing_docs)] -#![cfg_attr(feature = "unstable-doc-cfg", feature(doc_cfg))] - -//! Synchronous in-memory pipe -//! -//! ## Example -//! -//! ``` -//! use std::thread::spawn; -//! use std::io::{Read, Write}; -//! -//! let (mut read, mut write) = ripgrep_all::pipe::pipe(); -//! -//! let message = "Hello, world!"; -//! spawn(move || write.write_all(message.as_bytes()).unwrap()); -//! -//! let mut s = String::new(); -//! read.read_to_string(&mut s).unwrap(); -//! -//! assert_eq!(&s, message); -//! ``` - -use crossbeam_channel::{Receiver, Sender}; -use std::cmp::min; -use std::io::{self, BufRead, Read, Result, Write}; - -/// The `Read` end of a pipe (see `pipe()`) -pub struct PipeReader { - receiver: Receiver>>, - buffer: Vec, - position: usize, -} - -/// The `Write` end of a pipe (see `pipe()`) -#[derive(Clone)] -pub struct PipeWriter { - sender: Sender>>, -} - -/// Creates a synchronous memory pipe -pub fn pipe() -> (PipeReader, PipeWriter) { - let (sender, receiver) = crossbeam_channel::bounded(0); - - ( - PipeReader { - receiver, - buffer: Vec::new(), - position: 0, - }, - PipeWriter { sender }, - ) -} - -impl PipeWriter { - /// Extracts the inner `SyncSender` from the writer - pub fn into_inner(self) -> Sender>> { - self.sender - } - - /// Write any error into the pipe, will be handled as an IO error - pub fn write_err(&self, e: std::io::Error) -> Result<()> { - self.sender - .send(Err(e)) - .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "pipe reader has been dropped")) - } -} - -impl PipeReader { - /// Extracts the inner `Receiver` from the writer, and any pending buffered data - pub fn into_inner(mut self) -> (Receiver>>, Vec) { - self.buffer.drain(..self.position); - (self.receiver, self.buffer) - } -} - -impl BufRead for PipeReader { - fn fill_buf(&mut self) -> io::Result<&[u8]> { - while self.position >= self.buffer.len() { - match self.receiver.recv() { - // The only existing error is EOF - Err(_) => break, - Ok(Err(e)) => Err(e)?, - Ok(Ok(data)) => { - self.buffer = data; - self.position = 0; - } - } - } - - Ok(&self.buffer[self.position..]) - } - - fn consume(&mut self, amt: usize) { - debug_assert!(self.buffer.len() - self.position >= amt); - self.position += amt - } -} - -impl Read for PipeReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - if buf.is_empty() { - return Ok(0); - } - - let internal = self.fill_buf()?; - - let len = min(buf.len(), internal.len()); - if len > 0 { - buf[..len].copy_from_slice(&internal[..len]); - self.consume(len); - } - Ok(len) - } -} - -impl Write for PipeWriter { - fn write(&mut self, buf: &[u8]) -> io::Result { - let data = buf.to_vec(); - - self.sender - .send(Ok(data)) - .map(|_| buf.len()) - .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "pipe reader has been dropped")) - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::io::{Read, Write}; - use std::thread::spawn; - - #[test] - fn pipe_reader() { - let i = b"hello there"; - let mut o = Vec::with_capacity(i.len()); - let (mut r, mut w) = pipe(); - let guard = spawn(move || { - w.write_all(&i[..5]).unwrap(); - w.write_all(&i[5..]).unwrap(); - drop(w); - }); - - r.read_to_end(&mut o).unwrap(); - assert_eq!(i, &o[..]); - - guard.join().unwrap(); - } - - #[test] - fn pipe_writer_fail() { - let i = b"hi"; - let (r, mut w) = pipe(); - let guard = spawn(move || { - drop(r); - }); - - assert!(w.write_all(i).is_err()); - - guard.join().unwrap(); - } - - #[test] - fn small_reads() { - let block_cnt = 20; - const BLOCK: usize = 20; - let (mut r, mut w) = pipe(); - let guard = spawn(move || { - for _ in 0..block_cnt { - let data = &[0; BLOCK]; - w.write_all(data).unwrap(); - } - }); - - let mut buff = [0; BLOCK / 2]; - let mut read = 0; - while let Ok(size) = r.read(&mut buff) { - // 0 means EOF - if size == 0 { - break; - } - read += size; - } - assert_eq!(block_cnt * BLOCK, read); - - guard.join().unwrap(); - } -} diff --git a/src/preproc.rs b/src/preproc.rs index 156a645..e190f6a 100644 --- a/src/preproc.rs +++ b/src/preproc.rs @@ -105,10 +105,6 @@ async fn buf_choose_adapter(ai: AdaptInfo) -> Result { */ pub async fn rga_preproc(ai: AdaptInfo) -> Result { debug!("path (hint) to preprocess: {:?}", ai.filepath_hint); - /*todo: move if archive_recursion_depth >= config.max_archive_recursion.0 { - let s = format!("{}[rga: max archive recursion reached]", line_prefix).into_bytes(); - return Ok(Box::new(std::io::Cursor::new(s))); - }*/ // todo: figure out when using a bufreader is a good idea and when it is not // seems to be good for File::open() reads, but not sure about within archives (tar, zip) @@ -228,8 +224,16 @@ pub fn loop_adapt( })?; let s = stream! { for await file in inp { - match buf_choose_adapter(file?).await.expect("todo: handle") { + match buf_choose_adapter(file?).await? { Ret::Recurse(ai, adapter, detection_reason, _active_adapters) => { + if ai.archive_recursion_depth >= ai.config.max_archive_recursion.0 { + let s = format!("{}[rga: max archive recursion reached ({})]", ai.line_prefix, ai.archive_recursion_depth).into_bytes(); + yield Ok(AdaptInfo { + inp: Box::pin(Cursor::new(s)), + ..ai + }); + continue; + } debug!( "Chose adapter '{}' because of matcher {:?}", &adapter.metadata().name, &detection_reason