From cde0e209d29195bd8253e4cc0c00d2300aabd586 Mon Sep 17 00:00:00 2001 From: phiresky Date: Sat, 29 Oct 2022 20:54:05 +0200 Subject: [PATCH] partial move to async --- Cargo.lock | 272 +++++++++++++++++++++++++++++++++++ Cargo.toml | 6 + src/adapters.rs | 11 +- src/adapters/custom.rs | 12 +- src/adapters/postproc.rs | 28 ++-- src/adapters/spawning.rs | 68 ++++++--- src/adapters/zip.rs | 2 +- src/caching_writer.rs | 80 +++++++---- src/preproc.rs | 304 ++++++++++++++++++--------------------- src/recurse.rs | 28 +++- 10 files changed, 573 insertions(+), 238 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 97458c4..68744df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -70,6 +70,41 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544" +[[package]] +name = "async-compression" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a" +dependencies = [ + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "zstd", + "zstd-safe", +] + +[[package]] +name = "async-stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atty" version = "0.2.14" @@ -135,6 +170,12 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +[[package]] +name = "bytes" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" + [[package]] name = "bzip2" version = "0.4.3" @@ -598,6 +639,55 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures-core" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac" + +[[package]] +name = "futures-io" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" + +[[package]] +name = "futures-macro" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9" + +[[package]] +name = "futures-task" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea" + +[[package]] +name = "futures-util" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6" +dependencies = [ + "futures-core", + "futures-macro", + "futures-task", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "generic-array" version = "0.12.4" @@ -845,6 +935,16 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "lock_api" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.17" @@ -895,6 +995,18 @@ dependencies = [ "adler", ] +[[package]] +name = "mio" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5d732bc30207a6423068df043e3d02e0735b155ad7ce1a6f76fe2baa5b158de" +dependencies = [ + "libc", + "log", + "wasi 0.11.0+wasi-snapshot-preview1", + "windows-sys 0.42.0", +] + [[package]] name = "nom" version = "7.1.1" @@ -969,6 +1081,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "num_threads" version = "0.1.6" @@ -1014,6 +1136,29 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dc9e0dc2adc1c69d09143aff38d3d30c5c3f0df0dad82e6d25547af174ebec0" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-sys 0.42.0", +] + [[package]] name = "password-hash" version = "0.4.2" @@ -1065,6 +1210,18 @@ dependencies = [ "indexmap", ] +[[package]] +name = "pin-project-lite" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "pkg-config" version = "0.3.26" @@ -1180,7 +1337,10 @@ name = "ripgrep_all" version = "0.9.7-alpha.0" dependencies = [ "anyhow", + "async-compression", + "async-stream", "bincode", + "bytes", "bzip2", "chrono", "clap 4.0.18", @@ -1213,6 +1373,9 @@ dependencies = [ "structopt", "tar", "tempfile", + "tokio", + "tokio-stream", + "tokio-util", "tree_magic_mini", "xz2", "zip", @@ -1392,6 +1555,15 @@ dependencies = [ "digest", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + [[package]] name = "size_format" version = "1.0.2" @@ -1402,12 +1574,31 @@ dependencies = [ "num", ] +[[package]] +name = "slab" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef" +dependencies = [ + "autocfg", +] + [[package]] name = "smallvec" version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" +[[package]] +name = "socket2" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02e2d2db9033d13a1567121ddd7a095ee144db4e1ca1b1bda3419bc0da294ebd" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -1595,6 +1786,87 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" +[[package]] +name = "tokio" +version = "1.21.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099" +dependencies = [ + "autocfg", + "bytes", + "libc", + "memchr", + "mio", + "num_cpus", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "winapi", +] + +[[package]] +name = "tokio-macros" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-stream" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", + "tokio-util", +] + +[[package]] +name = "tokio-util" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" +dependencies = [ + "bytes", + "futures-core", + "futures-io", + "futures-sink", + "futures-util", + "hashbrown", + "pin-project-lite", + "slab", + "tokio", + "tracing", +] + +[[package]] +name = "tracing" +version = "0.1.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" +dependencies = [ + "once_cell", +] + [[package]] name = "tree_magic_mini" version = "3.0.3" diff --git a/Cargo.toml b/Cargo.toml index 57ce72a..2a21593 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,12 @@ dyn-clone = "1.0.2" dyn-clonable = "0.9.0" zip = "0.6.3" owning_ref = "0.4.1" +tokio = { version = "1.21.2", features = ["full"] } +async-compression = { version = "0.3.15", features = ["tokio", "zstd"] } +tokio-stream = { version = "0.1.11", features = ["io-util", "tokio-util"] } +async-stream = "0.3.3" +bytes = "1.2.1" +tokio-util = { version = "0.7.4", features = ["io", "full"] } [dev-dependencies] ctor = "0.1.20" diff --git a/src/adapters.rs b/src/adapters.rs index d0dfa68..72a69b5 100644 --- a/src/adapters.rs +++ b/src/adapters.rs @@ -1,28 +1,29 @@ pub mod custom; // pub mod decompress; // pub mod ffmpeg; -pub mod postproc; +// pub mod postproc; // pub mod pdfpages; pub mod spawning; // pub mod sqlite; // pub mod tar; // pub mod tesseract; // pub mod writing; -pub mod zip; +// pub mod zip; use crate::{adapted_iter::AdaptedFilesIterBox, config::RgaConfig, matching::*}; use anyhow::*; use custom::builtin_spawning_adapters; use custom::CustomAdapterConfig; use log::*; +use tokio::io::AsyncRead; use std::borrow::Cow; use std::collections::HashMap; -use std::io::prelude::*; use std::iter::Iterator; use std::path::PathBuf; +use std::pin::Pin; use std::rc::Rc; -pub type ReadBox<'a> = Box; +pub type ReadBox<'a> = Pin>; pub struct AdapterMeta { /// unique short name of this adapter (a-z0-9 only) pub name: String, @@ -115,7 +116,7 @@ pub fn get_all_adapters(custom_adapters: Option>) -> Ad let internal_adapters: Vec> = vec![ //Rc::new(ffmpeg::FFmpegAdapter::new()), - Rc::new(zip::ZipAdapter::new()), + // Rc::new(zip::ZipAdapter::new()), //Rc::new(decompress::DecompressAdapter::new()), // Rc::new(tar::TarAdapter::new()), //Rc::new(sqlite::SqliteAdapter::new()), diff --git a/src/adapters/custom.rs b/src/adapters/custom.rs index 54acf81..ad40570 100644 --- a/src/adapters/custom.rs +++ b/src/adapters/custom.rs @@ -166,8 +166,8 @@ impl SpawningFileAdapterTrait for CustomSpawningFileAdapter { fn command( &self, filepath_hint: &std::path::Path, - mut command: std::process::Command, - ) -> Result { + mut command: tokio::process::Command, + ) -> Result { command.args( self.args .iter() @@ -218,10 +218,10 @@ mod test { use super::*; use crate::test_utils::*; use anyhow::Result; - use std::fs::File; + use tokio::fs::File; - #[test] - fn poppler() -> Result<()> { + #[tokio::test] + async fn poppler() -> Result<()> { let adapter = builtin_spawning_adapters .iter() .find(|e| e.name == "poppler") @@ -231,7 +231,7 @@ mod test { let filepath = test_data_dir().join("short.pdf"); - let (a, d) = simple_adapt_info(&filepath, Box::new(File::open(&filepath)?)); + let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?)); let r = adapter.adapt(a, &d)?; let o = adapted_to_vec(r)?; assert_eq!( diff --git a/src/adapters/postproc.rs b/src/adapters/postproc.rs index 7164ee7..68c5b94 100644 --- a/src/adapters/postproc.rs +++ b/src/adapters/postproc.rs @@ -5,10 +5,10 @@ use anyhow::Context; use anyhow::Result; use encoding_rs_io::DecodeReaderBytesBuilder; +use tokio::io::AsyncRead; use std::{ cmp::min, - io::{BufRead, BufReader, Read}, }; use crate::adapted_iter::{AdaptedFilesIterBox, SingleAdaptedFileAsIter}; @@ -16,11 +16,11 @@ use crate::adapted_iter::{AdaptedFilesIterBox, SingleAdaptedFileAsIter}; use super::{AdaptInfo, AdapterMeta, FileAdapter, GetMetadata}; /** pass through, except adding \n at the end */ -pub struct EnsureEndsWithNewline { +pub struct EnsureEndsWithNewline { inner: R, added_newline: bool, } -impl EnsureEndsWithNewline { +impl EnsureEndsWithNewline { pub fn new(r: R) -> EnsureEndsWithNewline { EnsureEndsWithNewline { inner: r, @@ -28,8 +28,8 @@ impl EnsureEndsWithNewline { } } } -impl Read for EnsureEndsWithNewline { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { +impl AsyncRead for EnsureEndsWithNewline { + fn poll_read(&mut self, buf: &mut [u8]) -> std::io::Result { match self.inner.read(buf) { Ok(0) => { if self.added_newline { @@ -47,7 +47,7 @@ impl Read for EnsureEndsWithNewline { } struct ByteReplacer where - R: Read, + R:AsyncRead, { inner: R, next_read: Vec, @@ -57,7 +57,7 @@ where impl ByteReplacer where - R: Read, + R: AsyncRead, { fn output_next(&mut self, buf: &mut [u8], buf_valid_until: usize, replacement: &[u8]) -> usize { let after_part1 = Vec::from(&buf[1..buf_valid_until]); @@ -80,11 +80,11 @@ where } } -impl Read for ByteReplacer +impl AsyncRead for ByteReplacer where - R: Read, + R: AsyncRead, { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + fn poll_read(&mut self, buf: &mut [u8]) -> std::io::Result { let read = if self.next_read.len() > 0 { let count = std::cmp::min(self.next_read.len(), buf.len()); buf[0..count].copy_from_slice(&self.next_read[0..count]); @@ -158,10 +158,10 @@ impl Read for ReadErr { } }*/ -pub fn postproc_encoding<'a, R: Read + 'a>( +pub fn postproc_encoding<'a, R: AsyncRead + 'a>( line_prefix: &str, inp: R, -) -> Result> { +) -> Result> { // TODO: parse these options from ripgrep's configuration let encoding = None; // detect bom but usually assume utf8 let bom_sniffing = true; @@ -202,7 +202,7 @@ pub fn postproc_encoding<'a, R: Read + 'a>( )) } -pub fn postproc_prefix(line_prefix: &str, inp: impl Read) -> impl Read { +pub fn postproc_prefix(line_prefix: &str, inp: impl AsyncRead) -> impl AsyncRead { let line_prefix = line_prefix.to_string(); // clone since we need it later ByteReplacer { inner: inp, @@ -212,7 +212,7 @@ pub fn postproc_prefix(line_prefix: &str, inp: impl Read) -> impl Read { } } -pub fn postproc_pagebreaks(line_prefix: &str, inp: impl Read) -> impl Read { +pub fn postproc_pagebreaks(line_prefix: &str, inp: impl AsyncRead) -> impl AsyncRead { let line_prefix = line_prefix.to_string(); // clone since let mut page_count = 1; diff --git a/src/adapters/spawning.rs b/src/adapters/spawning.rs index e9e6d4f..c93c13d 100644 --- a/src/adapters/spawning.rs +++ b/src/adapters/spawning.rs @@ -1,14 +1,17 @@ - use crate::adapted_iter::SingleAdaptedFileAsIter; use super::*; use anyhow::Result; use log::*; -use std::process::Command; -use std::process::{Child, Stdio}; -use std::{io::prelude::*, path::Path}; use crate::adapters::FileAdapter; +use std::future::Future; +use std::path::Path; +use std::process::{ExitStatus, Stdio}; +use std::task::Poll; +use tokio::io::AsyncReadExt; +use tokio::process::Child; +use tokio::process::Command; // TODO: don't separate the trait and the struct pub trait SpawningFileAdapterTrait: GetMetadata { @@ -50,11 +53,29 @@ pub fn map_exe_error(err: std::io::Error, exe_name: &str, help: &str) -> Error { /** waits for a process to finish, returns an io error if the process failed */ struct ProcWaitReader { - proce: Child, + proce: Pin>>>, } -impl Read for ProcWaitReader { - fn read(&mut self, _buf: &mut [u8]) -> std::io::Result { - let status = self.proce.wait()?; +impl AsyncRead for ProcWaitReader { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + match self.proce.as_mut().poll(cx) { + std::task::Poll::Ready(x) => { + let x = x?; + if x.success() { + Poll::Ready(std::io::Result::Ok(())) + } else { + Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::Other, + format_err!("subprocess failed: {:?}", x), + ))) + } + } + Poll::Pending => std::task::Poll::Pending, + } + /*let status = self.proce.wait(); if status.success() { std::io::Result::Ok(0) } else { @@ -62,13 +83,13 @@ impl Read for ProcWaitReader { std::io::ErrorKind::Other, format_err!("subprocess failed: {:?}", status), )) - } + }*/ } } pub fn pipe_output<'a>( _line_prefix: &str, mut cmd: Command, - inp: &mut (dyn Read + 'a), + inp: ReadBox<'a>, exe_name: &str, help: &str, ) -> Result> { @@ -81,10 +102,14 @@ pub fn pipe_output<'a>( let stdo = cmd.stdout.take().expect("is piped"); // TODO: deadlocks since this is run in the same thread as the thing reading from stdout of the process - std::io::copy(inp, &mut stdi)?; - drop(stdi); + tokio::task::spawn_local(async { + tokio::pin!(inp); + tokio::io::copy(&mut inp, &mut stdi).await; + }); - Ok(Box::new(stdo.chain(ProcWaitReader { proce: cmd }))) + Ok(Box::pin(stdo.chain(ProcWaitReader { + proce: Box::pin(cmd.wait()), + }))) } impl FileAdapter for SpawningFileAdapter { @@ -109,7 +134,7 @@ impl FileAdapter for SpawningFileAdapter { .command(&filepath_hint, cmd) .with_context(|| format!("Could not set cmd arguments for {}", self.inner.get_exe()))?; debug!("executing {:?}", cmd); - let output = pipe_output(&line_prefix, cmd, &mut inp, self.inner.get_exe(), "")?; + let output = pipe_output(&line_prefix, cmd, inp, self.inner.get_exe(), "")?; Ok(Box::new(SingleAdaptedFileAsIter::new(AdaptInfo { filepath_hint: PathBuf::from(format!("{}.txt", filepath_hint.to_string_lossy())), // TODO: customizable inp: output, @@ -122,14 +147,16 @@ impl FileAdapter for SpawningFileAdapter { } } - #[cfg(test)] mod test { use std::io::Cursor; - use crate::{adapters::custom::CustomAdapterConfig, test_utils::{adapted_to_vec, simple_adapt_info}}; use super::*; use crate::adapters::FileAdapter; + use crate::{ + adapters::custom::CustomAdapterConfig, + test_utils::{adapted_to_vec, simple_adapt_info}, + }; #[test] fn streaming() { @@ -143,7 +170,7 @@ mod test { mimetypes: None, match_only_by_mime: None, binary: "sed".to_string(), - args: vec!["s/e/u/g".to_string()] + args: vec!["s/e/u/g".to_string()], }; let adapter = adapter.to_adapter(); @@ -159,10 +186,13 @@ mod test { let input = format!("{0}{0}{0}{0}", input); let input = format!("{0}{0}{0}{0}", input); let input = format!("{0}{0}{0}{0}", input); - let (a, d) = simple_adapt_info(&Path::new("foo.txt"), Box::new(Cursor::new(input.as_bytes()))); + let (a, d) = simple_adapt_info( + &Path::new("foo.txt"), + Box::new(Cursor::new(input.as_bytes())), + ); let output = adapter.adapt(a, &d).unwrap(); let oup = adapted_to_vec(output).unwrap(); println!("output: {}", String::from_utf8_lossy(&oup)); } -} \ No newline at end of file +} diff --git a/src/adapters/zip.rs b/src/adapters/zip.rs index b9527fe..a5bf4d7 100644 --- a/src/adapters/zip.rs +++ b/src/adapters/zip.rs @@ -85,7 +85,7 @@ impl<'a> AdaptedFilesIter for ZipAdaptIter<'a> { #[cfg(test)] mod test { use super::*; - use crate::{recurse::RecursingConcattyReader, test_utils::*}; + use crate::test_utils::*; fn create_zip(fname: &str, content: &str, add_inner: bool) -> Result> { use ::zip::write::FileOptions; diff --git a/src/caching_writer.rs b/src/caching_writer.rs index 74c36f1..9adce24 100644 --- a/src/caching_writer.rs +++ b/src/caching_writer.rs @@ -1,20 +1,24 @@ +use std::{pin::Pin, task::Poll}; + use anyhow::Result; use log::*; -use std::io::{Read, Write}; +use tokio::{io::{AsyncRead, AsyncWriteExt, AsyncWrite}, pin}; +use async_compression::tokio::write::ZstdEncoder; /** * wrap a writer so that it is passthrough, * but also the written data is compressed and written into a buffer, * unless more than max_cache_size bytes is written, then the cache is dropped and it is pure passthrough. */ -pub struct CachingReader { +pub struct CachingReader { max_cache_size: usize, - zstd_writer: Option>>, - inp: R, + // set to none if the size goes over the limit + zstd_writer: Option>>, + inp: Pin>, bytes_written: u64, on_finish: Box>)) -> Result<()> + Send>, } -impl CachingReader { +impl CachingReader { pub fn new( inp: R, max_cache_size: usize, @@ -22,56 +26,78 @@ impl CachingReader { on_finish: Box>)) -> Result<()> + Send>, ) -> Result> { Ok(CachingReader { - inp, + inp: Box::pin(inp), max_cache_size, - zstd_writer: Some(zstd::stream::write::Encoder::new( + zstd_writer: Some(ZstdEncoder::with_quality( Vec::new(), - compression_level, - )?), + async_compression::Level::Precise(compression_level as u32), + )), bytes_written: 0, on_finish, }) } - pub fn finish(&mut self) -> std::io::Result<(u64, Option>)> { + pub fn finish(&mut self, cx: &mut std::task::Context<'_>) -> std::io::Result<(u64, Option>)> { if let Some(writer) = self.zstd_writer.take() { - let res = writer.finish()?; + pin!(writer); + writer.as_mut().poll_shutdown(cx)?; + let res = writer.into_inner(); if res.len() <= self.max_cache_size { return Ok((self.bytes_written, Some(res))); } } Ok((self.bytes_written, None)) } - fn write_to_compressed(&mut self, buf: &[u8]) -> std::io::Result<()> { + async fn write_to_compressed(&mut self, buf: &[u8]) -> std::io::Result<()> { if let Some(writer) = self.zstd_writer.as_mut() { - let wrote = writer.write(buf)?; + let wrote = writer.write_all(buf).await?; let compressed_len = writer.get_ref().len(); - trace!("wrote {} to zstd, len now {}", wrote, compressed_len); + trace!("wrote {} to zstd, len now {}", buf.len(), compressed_len); if compressed_len > self.max_cache_size { debug!("cache longer than max, dropping"); //writer.finish(); - self.zstd_writer.take().unwrap().finish()?; + self.zstd_writer.take(); } } Ok(()) } } -impl Read for CachingReader { - fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result { - match self.inp.read(&mut buf) { - Ok(0) => { - // move out of box, replace with noop lambda - let on_finish = std::mem::replace(&mut self.on_finish, Box::new(|_| Ok(()))); - // EOF, finish! - (on_finish)(self.finish()?) - .map(|()| 0) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) +impl AsyncRead for CachingReader where R: AsyncRead { + + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let old_filled = buf.filled(); + match self.inp.as_mut().poll_read(cx, &mut buf) { + /*Ok(0) => { + } Ok(read_bytes) => { self.write_to_compressed(&buf[0..read_bytes])?; self.bytes_written += read_bytes as u64; Ok(read_bytes) - } - Err(e) => Err(e), + }*/ + Poll::Ready(rdy) => { + if let Ok(()) = &rdy { + let slice = buf.filled(); + let read_bytes = slice.len() - old_filled.len(); + if read_bytes == 0 { + // EOF + // move out of box, replace with noop lambda + let on_finish = std::mem::replace(&mut self.on_finish, Box::new(|_| Ok(()))); + // EOF, finish! + (on_finish)(self.finish(cx)?) + .map(|()| 0) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + } else { + self.write_to_compressed(&slice[old_filled.len()..]); + self.bytes_written += read_bytes as u64; + } + } + Poll::Ready(rdy) + }, + Poll::Pending => Poll::Pending, } } } diff --git a/src/preproc.rs b/src/preproc.rs index 5d3122c..d80db15 100644 --- a/src/preproc.rs +++ b/src/preproc.rs @@ -1,4 +1,6 @@ use crate::adapters::*; +use crate::config::RgaConfig; +use crate::recurse::concat_read_streams; use crate::{matching::*, recurse::RecursingConcattyReader}; use crate::{ preproc_cache::{LmdbCache, PreprocCache}, @@ -7,47 +9,32 @@ use crate::{ use anyhow::*; use log::*; use path_clean::PathClean; -use postproc::PostprocPrefix; +// use postproc::PostprocPrefix; use std::convert::TryInto; - -use std::io::{BufRead, BufReader}; +use std::path::Path; +use tokio::io::AsyncBufReadExt; +use tokio::io::BufReader; +use tokio::io::{AsyncBufRead, AsyncRead}; use std::{rc::Rc, time::Instant}; -/** - * preprocess a file as defined in `ai`. - * - * If a cache is passed, read/write to it. - * - */ -pub fn rga_preproc(ai: AdaptInfo) -> Result { - let AdaptInfo { - filepath_hint, - is_real_file, - inp, - line_prefix, - config, - archive_recursion_depth, - postprocess, - } = ai; - debug!("path (hint) to preprocess: {:?}", filepath_hint); - let filtered_adapters = - get_adapters_filtered(config.custom_adapters.clone(), &config.adapters)?; - let adapters = adapter_matcher(&filtered_adapters, config.accurate)?; + +type ActiveAdapters = Vec<(Rc)>; + +async fn choose_adapter( + config: &RgaConfig, + filepath_hint: &Path, + archive_recursion_depth: i32, + mut inp: &mut (impl AsyncBufRead + Unpin), +) -> Result, FileMatcher, ActiveAdapters)>> { + let active_adapters = get_adapters_filtered(config.custom_adapters.clone(), &config.adapters)?; + let adapters = adapter_matcher(&active_adapters, config.accurate)?; let filename = filepath_hint .file_name() .ok_or_else(|| format_err!("Empty filename"))?; debug!("Archive recursion depth: {}", archive_recursion_depth); - if archive_recursion_depth >= config.max_archive_recursion.0 { - let s = format!("{}[rga: max archive recursion reached]", line_prefix).into_bytes(); - return Ok(Box::new(std::io::Cursor::new(s))); - } - - // todo: figure out when using a bufreader is a good idea and when it is not - // seems to be good for File::open() reads, but not sure about within archives (tar, zip) - let mut inp = BufReader::with_capacity(1 << 16, inp); let mimetype = if config.accurate { - let buf = inp.fill_buf()?; // fill but do not consume! + let buf = inp.fill_buf().await?; // fill but do not consume! let mimetype = tree_magic::from_u8(buf); debug!("mimetype: {:?}", mimetype); Some(mimetype) @@ -58,52 +45,105 @@ pub fn rga_preproc(ai: AdaptInfo) -> Result { mimetype, lossy_filename: filename.to_string_lossy().to_string(), }); - let (adapter, detection_reason) = match adapter { - Some((a, d)) => (a, d), + Ok(adapter.map(|e| (e.0, e.1, active_adapters))) +} +/** + * preprocess a file as defined in `ai`. + * + * If a cache is passed, read/write to it. + * + */ +pub async fn rga_preproc(ai: AdaptInfo<'_>) -> Result> { + debug!("path (hint) to preprocess: {:?}", ai.filepath_hint); + /*todo: move if archive_recursion_depth >= config.max_archive_recursion.0 { + let s = format!("{}[rga: max archive recursion reached]", line_prefix).into_bytes(); + return Ok(Box::new(std::io::Cursor::new(s))); + }*/ + + // todo: figure out when using a bufreader is a good idea and when it is not + // seems to be good for File::open() reads, but not sure about within archives (tar, zip) + let mut inp = BufReader::with_capacity(1 << 16, ai.inp); + let adapter = choose_adapter( + &ai.config, + &ai.filepath_hint, + ai.archive_recursion_depth, + &mut inp, + ) + .await?; + let (adapter, detection_reason, active_adapters) = match adapter { + Some((a, d, e)) => (a, d, e), None => { // allow passthrough if the file is in an archive or accurate matching is enabled // otherwise it should have been filtered out by rg pre-glob since rg can handle those better than us - let allow_cat = !is_real_file || config.accurate; + let allow_cat = !ai.is_real_file || ai.config.accurate; if allow_cat { - if postprocess { - ( + if ai.postprocess { + panic!("not implemented"); + /* ( Rc::new(PostprocPrefix {}) as Rc, FileMatcher::Fast(FastFileMatcher::FileExtension("default".to_string())), // todo: separate enum value for this - ) + )*/ } else { - return Ok(Box::new(inp)); + return Ok(Box::pin(inp)); } } else { return Err(format_err!( "No adapter found for file {:?}, passthrough disabled.", - filename + ai.filepath_hint + .file_name() + .ok_or_else(|| format_err!("Empty filename"))? )); } } }; - let path_hint_copy = filepath_hint.clone(); - run_adapter( + let path_hint_copy = ai.filepath_hint.clone(); + run_adapter_recursively( AdaptInfo { - filepath_hint, - is_real_file, - inp: Box::new(inp), - line_prefix, - config, - archive_recursion_depth, - postprocess, + inp: Box::pin(inp), + ..ai }, adapter, detection_reason, - &filtered_adapters, + active_adapters, ) + .await .with_context(|| format!("run_adapter({})", &path_hint_copy.to_string_lossy())) } -fn run_adapter<'a>( +fn compute_cache_key( + filepath_hint: &Path, + adapter: &dyn FileAdapter, + active_adapters: ActiveAdapters, +) -> Result> { + let clean_path = filepath_hint.to_owned().clean(); + let meta = std::fs::metadata(&filepath_hint) + .with_context(|| format!("reading metadata for {}", filepath_hint.to_string_lossy()))?; + let modified = meta.modified().expect("weird OS that can't into mtime"); + + if adapter.metadata().recurses { + let active_adapters_cache_key = active_adapters + .iter() + .map(|a| (a.metadata().name.clone(), a.metadata().version)) + .collect::>(); + let key = (active_adapters_cache_key, clean_path, modified); + debug!("Cache key (with recursion): {:?}", key); + bincode::serialize(&key).context("could not serialize path") + } else { + let key = ( + adapter.metadata().name.clone(), + adapter.metadata().version, + clean_path, + modified, + ); + debug!("Cache key (no recursion): {:?}", key); + bincode::serialize(&key).context("could not serialize path") + } +} +async fn run_adapter_recursively<'a>( ai: AdaptInfo<'a>, adapter: Rc, detection_reason: FileMatcher, - filtered_adapters: &Vec>, + active_adapters: ActiveAdapters, ) -> Result> { let AdaptInfo { filepath_hint, @@ -134,116 +174,56 @@ fn run_adapter<'a>( None }; - if let Some(mut cache) = cache { - let cache_key: Vec = { - let clean_path = filepath_hint.to_owned().clean(); - let meta = std::fs::metadata(&filepath_hint).with_context(|| { - format!("reading metadata for {}", filepath_hint.to_string_lossy()) - })?; - let modified = meta.modified().expect("weird OS that can't into mtime"); - - if adapter.metadata().recurses { - let key = ( - filtered_adapters - .iter() - .map(|a| (a.metadata().name.clone(), a.metadata().version)) - .collect::>(), - clean_path, - modified, - ); - debug!("Cache key (with recursion): {:?}", key); - bincode::serialize(&key).expect("could not serialize path") - } else { - let key = ( - adapter.metadata().name.clone(), - adapter.metadata().version, - clean_path, - modified, - ); - debug!("Cache key (no recursion): {:?}", key); - bincode::serialize(&key).expect("could not serialize path") - } - }; - // let dbg_ctx = format!("adapter {}", &adapter.metadata().name); - let cached = cache.get(&db_name, &cache_key)?; - match cached { - Some(cached) => Ok(Box::new( - zstd::stream::read::Decoder::new(std::io::Cursor::new(cached)) - .context("could not create zstd decoder")?, - )), - None => { - debug!("cache MISS, running adapter"); - debug!("adapting with caching..."); - let inp = adapter - .adapt( - AdaptInfo { - line_prefix, - filepath_hint: filepath_hint.clone(), - is_real_file, - inp: Box::new(inp), - archive_recursion_depth, - config, - postprocess, - }, - &detection_reason, - ) - .with_context(|| { - format!( - "adapting {} via {} failed", - filepath_hint.to_string_lossy(), - meta.name - ) - })?; - let inp = RecursingConcattyReader::concat(inp)?; - let inp = CachingReader::new( - inp, - cache_max_blob_len.0.try_into().unwrap(), - cache_compression_level.0.try_into().unwrap(), - Box::new(move |(uncompressed_size, compressed)| { - debug!( - "uncompressed output: {}", - print_bytes(uncompressed_size as f64) - ); - if let Some(cached) = compressed { - debug!("compressed output: {}", print_bytes(cached.len() as f64)); - cache.set(&db_name, &cache_key, &cached)? - } - Ok(()) - }), - )?; - - Ok(Box::new(inp)) - } - } - } else { - // no cache arc - probably within archive - debug!("adapting without caching..."); - let start = Instant::now(); - let oread = adapter - .adapt( - AdaptInfo { - line_prefix, - filepath_hint: filepath_hint.clone(), - is_real_file, - inp, - archive_recursion_depth, - config, - postprocess, - }, - &detection_reason, - ) - .with_context(|| { - format!( - "adapting {} via {} without caching failed", - filepath_hint.to_string_lossy(), - meta.name + let mut cache = cache.context("No cache?")?; + let cache_key: Vec = compute_cache_key(&filepath_hint, adapter.as_ref(), active_adapters)?; + // let dbg_ctx = format!("adapter {}", &adapter.metadata().name); + let cached = cache.get(&db_name, &cache_key)?; + match cached { + Some(cached) => Ok(Box::pin( + async_compression::tokio::bufread::ZstdDecoder::new(std::io::Cursor::new(cached)), + )), + None => { + debug!("cache MISS, running adapter"); + debug!("adapting with caching..."); + let inp = adapter + .adapt( + AdaptInfo { + line_prefix, + filepath_hint: filepath_hint.clone(), + is_real_file, + inp, + archive_recursion_depth, + config, + postprocess, + }, + &detection_reason, ) - })?; - debug!( - "running adapter {} took {}", - adapter.metadata().name, - print_dur(start) - ); - Ok(RecursingConcattyReader::concat(oread)?) + .with_context(|| { + format!( + "adapting {} via {} failed", + filepath_hint.to_string_lossy(), + meta.name + ) + })?; + let inp = concat_read_streams(inp); + let inp = CachingReader::new( + inp, + cache_max_blob_len.0.try_into().unwrap(), + cache_compression_level.0.try_into().unwrap(), + Box::new(move |(uncompressed_size, compressed)| { + debug!( + "uncompressed output: {}", + print_bytes(uncompressed_size as f64) + ); + if let Some(cached) = compressed { + debug!("compressed output: {}", print_bytes(cached.len() as f64)); + cache.set(&db_name, &cache_key, &cached)? + } + Ok(()) + }), + )?; + + Ok(Box::pin(inp)) + } } } diff --git a/src/recurse.rs b/src/recurse.rs index a51beec..517c3e5 100644 --- a/src/recurse.rs +++ b/src/recurse.rs @@ -1,14 +1,33 @@ +use tokio::io::AsyncRead; +use tokio_util::io::{ReaderStream, StreamReader}; + use crate::preproc::rga_preproc; use crate::{adapted_iter::AdaptedFilesIterBox, adapters::*}; - -use std::io::Read; +use async_stream::stream; +use tokio_stream::Stream; +use bytes::Bytes; +use tokio_stream::StreamExt; pub struct RecursingConcattyReader<'a> { inp: AdaptedFilesIterBox<'a>, cur: Option>, } +pub fn concat_read_streams( + mut input: AdaptedFilesIterBox<'_>, +) -> ReadBox<'_> { + let s = stream! { + while let Some(output) = input.next() { + let mut stream = ReaderStream::new(output.inp); + while let Some(bytes) = stream.next().await { + yield bytes; + } + } + }; + Box::pin(StreamReader::new(s)) +} +/* impl<'a> RecursingConcattyReader<'a> { - pub fn concat(inp: AdaptedFilesIterBox<'a>) -> anyhow::Result> { + pub fn concat(inp: AdaptedFilesIterBox<'a>) -> anyhow::Result> { let mut r = RecursingConcattyReader { inp, cur: None }; r.ascend()?; Ok(Box::new(r)) @@ -28,7 +47,7 @@ impl<'a> RecursingConcattyReader<'a> { Ok(()) } } -impl<'a> Read for RecursingConcattyReader<'a> { +impl<'a> AsyncRead for RecursingConcattyReader<'a> { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { match &mut self.cur { None => Ok(0), // last file ended @@ -45,3 +64,4 @@ impl<'a> Read for RecursingConcattyReader<'a> { } } } +*/ \ No newline at end of file