From 0895e7a6cfc1b7c3b681ca85214002a2e8fc331e Mon Sep 17 00:00:00 2001 From: phiresky Date: Wed, 17 Jun 2020 11:43:47 +0200 Subject: [PATCH] continue refactoring --- CHANGELOG.md | 3 +- src/adapters.rs | 5 +- src/bin/rga-preproc.rs | 17 +- src/bin/rga.rs | 2 +- src/caching_writer.rs | 80 +++++---- src/{args.rs => config.rs} | 110 ++++++++----- src/lib.rs | 4 +- src/preproc.rs | 328 ++++++++++++++++++++----------------- src/preproc_cache.rs | 113 ++++++------- src/test_utils.rs | 12 +- 10 files changed, 370 insertions(+), 304 deletions(-) rename src/{args.rs => config.rs} (92%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 70f4b4e..47c273e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,8 @@ - add loads of debug logs and performance timings when `--debug` is used - better error messages via `anyhow` - add cross-platform rga-fzf binary -- add a config file including schema +- add a config file (~/.config/ripgrep-all) that is generated on first use, including schema +- change adapter interface from `(&Read, &Write) -> ()` to `Read -> Read` to allow chaining of adapters # 0.9.6 (2020-05-19) diff --git a/src/adapters.rs b/src/adapters.rs index bf382fb..efc6b85 100644 --- a/src/adapters.rs +++ b/src/adapters.rs @@ -10,8 +10,7 @@ pub mod sqlite; //pub mod tesseract; pub mod writing; // pub mod zip; -use crate::matching::*; -use crate::preproc::PreprocConfig; +use crate::{config::RgaConfig, matching::*}; use anyhow::*; use custom::builtin_spawning_adapters; use custom::CustomAdapterConfig; @@ -79,7 +78,7 @@ pub struct AdaptInfo { pub inp: ReadBox, /// prefix every output line with this string to better indicate the file's location if it is in some archive pub line_prefix: String, - pub config: PreprocConfig, + pub config: RgaConfig, } /// (enabledAdapters, disabledAdapters) diff --git a/src/bin/rga-preproc.rs b/src/bin/rga-preproc.rs index 4638da1..e065174 100644 --- a/src/bin/rga-preproc.rs +++ b/src/bin/rga-preproc.rs @@ -1,15 +1,17 @@ use rga::adapters::*; use rga::preproc::*; +use rga::print_dur; use ripgrep_all as rga; use anyhow::Context; -use std::fs::File; +use log::debug; +use std::{fs::File, time::Instant}; fn main() -> anyhow::Result<()> { env_logger::init(); let mut arg_arr: Vec = std::env::args_os().collect(); let last = arg_arr.pop().expect("No filename specified"); - let args = rga::args::parse_args(arg_arr, true)?; + let config = rga::config::parse_args(arg_arr, true)?; //clap::App::new("rga-preproc").arg(Arg::from_usage()) let path = { let filepath = last; @@ -18,20 +20,19 @@ fn main() -> anyhow::Result<()> { let i = File::open(&path).context("Specified input file not found")?; let mut o = std::io::stdout(); - let cache = if args.no_cache { - None - } else { - Some(rga::preproc_cache::open().context("could not open cache")?) - }; let ai = AdaptInfo { inp: Box::new(i), filepath_hint: path, is_real_file: true, line_prefix: "".to_string(), archive_recursion_depth: 0, - config: PreprocConfig { cache, args }, + config, }; + + let start = Instant::now(); let mut oup = rga_preproc(ai).context("during preprocessing")?; + debug!("finding and starting adapter took {}", print_dur(start)); std::io::copy(&mut oup, &mut o).context("copying adapter output to stdout")?; + debug!("running adapter took {} total", print_dur(start)); Ok(()) } diff --git a/src/bin/rga.rs b/src/bin/rga.rs index 5fd5b11..4be2aec 100644 --- a/src/bin/rga.rs +++ b/src/bin/rga.rs @@ -1,7 +1,7 @@ use anyhow::Result; use rga::adapters::spawning::map_exe_error; use rga::adapters::*; -use rga::args::*; +use rga::config::{split_args, RgaConfig}; use rga::matching::*; use rga::print_dur; use ripgrep_all as rga; diff --git a/src/caching_writer.rs b/src/caching_writer.rs index 6d8f042..e71e591 100644 --- a/src/caching_writer.rs +++ b/src/caching_writer.rs @@ -1,32 +1,39 @@ use anyhow::Result; use log::*; -use std::io::Write; +use std::io::{BufReader, Read, Write}; /** * wrap a writer so that it is passthrough, * but also the written data is compressed and written into a buffer, * unless more than max_cache_size bytes is written, then the cache is dropped and it is pure passthrough. */ -pub struct CachingWriter { +pub struct CachingReader { max_cache_size: usize, zstd_writer: Option>>, - out: W, + inp: R, bytes_written: u64, + on_finish: Box>)) -> Result<()> + Send>, } -impl CachingWriter { - pub fn new(out: W, max_cache_size: usize, compression_level: i32) -> Result> { - Ok(CachingWriter { - out, +impl CachingReader { + pub fn new( + inp: R, + max_cache_size: usize, + compression_level: i32, + on_finish: Box>)) -> Result<()> + Send>, + ) -> Result> { + Ok(CachingReader { + inp, max_cache_size, zstd_writer: Some(zstd::stream::write::Encoder::new( Vec::new(), compression_level, )?), bytes_written: 0, + on_finish, }) } - pub fn finish(self) -> std::io::Result<(u64, Option>)> { - if let Some(writer) = self.zstd_writer { + pub fn finish(&mut self) -> std::io::Result<(u64, Option>)> { + if let Some(writer) = self.zstd_writer.take() { let res = writer.finish()?; if res.len() <= self.max_cache_size { return Ok((self.bytes_written, Some(res))); @@ -34,32 +41,37 @@ impl CachingWriter { } Ok((self.bytes_written, None)) } -} -impl Write for CachingWriter { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - let written_bytes = match self.zstd_writer.as_mut() { - Some(writer) => { - let wrote = writer.write(buf)?; - let compressed_len = writer.get_ref().len(); - trace!("wrote {} to zstd, len now {}", wrote, compressed_len); - if compressed_len > self.max_cache_size { - debug!("cache longer than max, dropping"); - //writer.finish(); - self.zstd_writer.take().unwrap().finish()?; - } - self.out.write_all(&buf[0..wrote])?; - Ok(wrote) - } - None => self.out.write(buf), - }?; - self.bytes_written += written_bytes as u64; - Ok(written_bytes) - } - fn flush(&mut self) -> std::io::Result<()> { - debug!("flushing"); + fn write_to_compressed(&mut self, buf: &[u8]) -> std::io::Result<()> { if let Some(writer) = self.zstd_writer.as_mut() { - writer.flush()?; + let wrote = writer.write(buf)?; + let compressed_len = writer.get_ref().len(); + trace!("wrote {} to zstd, len now {}", wrote, compressed_len); + if compressed_len > self.max_cache_size { + debug!("cache longer than max, dropping"); + //writer.finish(); + self.zstd_writer.take().unwrap().finish()?; + } + } + Ok(()) + } +} +impl Read for CachingReader { + fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result { + match self.inp.read(&mut buf) { + Ok(0) => { + // move out of box, replace with noop lambda + let on_finish = std::mem::replace(&mut self.on_finish, Box::new(|_| Ok(()))); + // EOF, finish! + (on_finish)(self.finish()?) + .map(|()| 0) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) + } + Ok(read_bytes) => { + self.write_to_compressed(&buf[0..read_bytes])?; + self.bytes_written += read_bytes as u64; + Ok(read_bytes) + } + Err(e) => Err(e), } - self.out.flush() } } diff --git a/src/args.rs b/src/config.rs similarity index 92% rename from src/args.rs rename to src/config.rs index ea8a661..7858ec1 100644 --- a/src/args.rs +++ b/src/config.rs @@ -41,7 +41,23 @@ impl Default for MaxArchiveRecursion { } } -#[derive(JsonSchema, Debug, Serialize, Deserialize, Copy, Clone, PartialEq)] +#[derive(JsonSchema, Debug, Serialize, Deserialize, Clone, PartialEq, FromStr)] +pub struct CachePath(pub String); + +impl ToString for CachePath { + fn to_string(&self) -> String { + self.0.to_string() + } +} +impl Default for CachePath { + fn default() -> Self { + let pd = project_dirs().expect("could not get cache path"); + let app_cache = pd.cache_dir(); + CachePath(app_cache.to_str().expect("cache path not utf8").to_owned()) + } +} + +#[derive(JsonSchema, Debug, Serialize, Deserialize, Copy, Clone, PartialEq, Eq)] pub struct CacheMaxBlobLen(pub usize); impl ToString for CacheMaxBlobLen { @@ -96,18 +112,6 @@ impl FromStr for CacheMaxBlobLen { usage = "rga [RGA OPTIONS] [RG OPTIONS] PATTERN [PATH ...]" )] pub struct RgaConfig { - /// Disable caching of results - /// - /// By default, rga caches the extracted text, if it is small enough, - /// to a database in ~/.cache/rga on Linux, - /// ~/Library/Caches/rga on macOS, - /// or C:\Users\username\AppData\Local\rga on Windows. - /// This way, repeated searches on the same set of files will be much faster. - /// If you pass this flag, all caching will be disabled. - #[serde(default, skip_serializing_if = "is_default")] - #[structopt(long = "--rga-no-cache")] - pub no_cache: bool, - /// Use more accurate but slower matching by mime type /// /// By default, rga will match files using file extensions. @@ -133,31 +137,9 @@ pub struct RgaConfig { )] pub adapters: Vec, - /// Max compressed size to cache - /// - /// Longest byte length (after compression) to store in cache. Longer adapter outputs will not be cached and recomputed every time. Allowed suffixes: k M G #[serde(default, skip_serializing_if = "is_default")] - #[structopt( - default_value, - long = "--rga-cache-max-blob-len", - hidden_short_help = true, - require_equals = true, - // parse(try_from_str = parse_readable_bytes_str) - )] - pub cache_max_blob_len: CacheMaxBlobLen, - - /// ZSTD compression level to apply to adapter outputs before storing in cache db - /// - /// Ranges from 1 - 22 - #[serde(default, skip_serializing_if = "is_default")] - #[structopt( - default_value, - long = "--rga-cache-compression-level", - hidden_short_help = true, - require_equals = true, - help = "" - )] - pub cache_compression_level: CacheCompressionLevel, + #[structopt(flatten)] + pub cache: CacheConfig, /// Maximum nestedness of archives to recurse into #[serde(default, skip_serializing_if = "is_default")] @@ -210,6 +192,60 @@ pub struct RgaConfig { pub rg_version: bool, } +#[derive(StructOpt, Debug, Deserialize, Serialize, JsonSchema, Default, Clone, PartialEq)] +pub struct CacheConfig { + /// Disable caching of results + /// + /// By default, rga caches the extracted text, if it is small enough, + /// to a database in ~/.cache/rga on Linux, + /// ~/Library/Caches/rga on macOS, + /// or C:\Users\username\AppData\Local\rga on Windows. + /// This way, repeated searches on the same set of files will be much faster. + /// If you pass this flag, all caching will be disabled. + #[serde(default, skip_serializing_if = "is_default")] + #[structopt(long = "--rga-no-cache")] + pub disabled: bool, + + /// Max compressed size to cache + /// + /// Longest byte length (after compression) to store in cache. Longer adapter outputs will not be cached and recomputed every time. Allowed suffixes: k M G + #[serde(default, skip_serializing_if = "is_default")] + #[structopt( + default_value, + long = "--rga-cache-max-blob-len", + hidden_short_help = true, + require_equals = true, + // parse(try_from_str = parse_readable_bytes_str) + )] + pub max_blob_len: CacheMaxBlobLen, + + /// ZSTD compression level to apply to adapter outputs before storing in cache db + /// + /// Ranges from 1 - 22 + #[serde(default, skip_serializing_if = "is_default")] + #[structopt( + default_value, + long = "--rga-cache-compression-level", + hidden_short_help = true, + require_equals = true, + help = "" + )] + pub compression_level: CacheCompressionLevel, + + /// ZSTD compression level to apply to adapter outputs before storing in cache db + /// + /// Ranges from 1 - 22 + #[serde(default, skip_serializing_if = "is_default")] + #[structopt( + default_value, + long = "--rga-cache-path", + hidden_short_help = true, + require_equals = true, + help = "" + )] + pub path: CachePath, +} + static RGA_CONFIG: &str = "RGA_CONFIG"; use serde_json::Value; diff --git a/src/lib.rs b/src/lib.rs index 685e382..ea74fd3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,8 +3,8 @@ #![feature(specialization)] pub mod adapters; -pub mod args; mod caching_writer; +pub mod config; pub mod matching; pub mod pipe; pub mod preproc; @@ -13,7 +13,7 @@ pub mod preproc_cache; pub mod test_utils; use anyhow::Context; use anyhow::Result; -pub use caching_writer::CachingWriter; +pub use caching_writer::CachingReader; use directories_next::ProjectDirs; use std::time::Instant; diff --git a/src/preproc.rs b/src/preproc.rs index e45a63a..f7df935 100644 --- a/src/preproc.rs +++ b/src/preproc.rs @@ -1,24 +1,18 @@ use crate::adapters::*; -use crate::args::RgaConfig; use crate::matching::*; -use crate::{print_bytes, print_dur, CachingWriter}; +use crate::{ + config::RgaConfig, + preproc_cache::{LmdbCache, PreprocCache}, + print_bytes, print_dur, CachingReader, +}; use anyhow::*; use log::*; +use path_clean::PathClean; +use std::convert::TryInto; +use std::io::{BufRead, BufReader}; - -use std::io::BufReader; - -use std::{ - sync::{Arc, RwLock}, - time::Instant, -}; - -#[derive(Clone)] -pub struct PreprocConfig { - pub cache: Option>>, - pub args: RgaConfig, -} +use std::{path::PathBuf, rc::Rc, time::Instant}; /** * preprocess a file as defined in `ai`. * @@ -36,164 +30,52 @@ pub fn rga_preproc(ai: AdaptInfo) -> Result { .. } = ai; debug!("path (hint) to preprocess: {:?}", filepath_hint); - let PreprocConfig { cache: _, args } = config; - let filtered_adapters = get_adapters_filtered(args.custom_adapters.clone(), &args.adapters)?; - let adapters = adapter_matcher(&filtered_adapters, args.accurate)?; + let filtered_adapters = + get_adapters_filtered(config.custom_adapters.clone(), &config.adapters)?; + let adapters = adapter_matcher(&filtered_adapters, config.accurate)?; let filename = filepath_hint .file_name() .ok_or_else(|| format_err!("Empty filename"))?; debug!("Archive recursion depth: {}", archive_recursion_depth); - if archive_recursion_depth >= args.max_archive_recursion.0 { + if archive_recursion_depth >= config.max_archive_recursion.0 { let s = format!("{}[rga: max archive recursion reached]", line_prefix).into_bytes(); return Ok(Box::new(std::io::Cursor::new(s))); } // todo: figure out when using a bufreader is a good idea and when it is not // seems to be good for File::open() reads, but not sure about within archives (tar, zip) - let inp = BufReader::with_capacity(1 << 16, inp); + let mut inp = BufReader::with_capacity(1 << 16, inp); - let mimetype = None; /*if args.accurate { - let buf = inp.fill_buf()?; // fill but do not consume! - let mimetype = tree_magic::from_u8(buf); - debug!("mimetype: {:?}", mimetype); - Some(mimetype) - } else { - None - };*/ + let mimetype = if config.accurate { + let buf = inp.fill_buf()?; // fill but do not consume! + let mimetype = tree_magic::from_u8(buf); + debug!("mimetype: {:?}", mimetype); + Some(mimetype) + } else { + None + }; let adapter = adapters(FileMeta { mimetype, lossy_filename: filename.to_string_lossy().to_string(), }); match adapter { - Some((adapter, detection_reason)) => { - let meta = adapter.metadata(); - debug!( - "Chose adapter '{}' because of matcher {:?}", - &meta.name, &detection_reason - ); - eprintln!( - "{} adapter: {}", - filepath_hint.to_string_lossy(), - &meta.name - ); - let _db_name = format!("{}.v{}", meta.name, meta.version); - /*if let Some(cache) = cache.as_mut() { - let cache_key: Vec = { - let clean_path = filepath_hint.to_owned().clean(); - let meta = std::fs::metadata(&filepath_hint)?; - let modified = meta.modified().expect("weird OS that can't into mtime"); - - if adapter.metadata().recurses { - let key = ( - filtered_adapters - .iter() - .map(|a| (a.metadata().name.clone(), a.metadata().version)) - .collect::>(), - clean_path, - modified, - ); - debug!("Cache key (with recursion): {:?}", key); - bincode::serialize(&key).expect("could not serialize path") - } else { - let key = ( - adapter.metadata().name.clone(), - adapter.metadata().version, - clean_path, - modified, - ); - debug!("Cache key (no recursion): {:?}", key); - bincode::serialize(&key).expect("could not serialize path") - } - }; - cache.write().unwrap().get_or_run( - &db_name, - &cache_key, - &adapter.metadata().name, - Box::new(|| -> Result>> { - // wrapping BufWriter here gives ~10% perf boost - let mut compbuf = BufWriter::new(CachingWriter::new( - oup, - args.cache_max_blob_len.0.try_into().unwrap(), - args.cache_compression_level.0.try_into().unwrap(), - )?); - debug!("adapting with caching..."); - adapter - .adapt( - AdaptInfo { - line_prefix, - filepath_hint, - is_real_file, - inp, - oup: &mut compbuf, - archive_recursion_depth, - config: PreprocConfig { cache: None, args }, - }, - &detection_reason, - ) - .with_context(|| { - format!( - "adapting {} via {} failed", - filepath_hint.to_string_lossy(), - meta.name - ) - })?; - let (uncompressed_size, compressed) = compbuf - .into_inner() - .map_err(|_| anyhow!("could not finish zstd"))? // can't use with_context here - .finish()?; - debug!( - "uncompressed output: {}", - print_bytes(uncompressed_size as f64) - ); - if let Some(cached) = compressed { - debug!("compressed output: {}", print_bytes(cached.len() as f64)); - Ok(Some(cached)) - } else { - Ok(None) - } - }), - Box::new(|cached| { - let stdouti = std::io::stdout(); - zstd::stream::copy_decode(cached, stdouti.lock())?; - Ok(()) - }), - )?; - Ok(()) - } else { */ - // no cache arc - probably within archive - debug!("adapting without caching..."); - let start = Instant::now(); - let oread = adapter - .adapt( - AdaptInfo { - line_prefix, - filepath_hint: filepath_hint.clone(), - is_real_file, - inp: Box::new(inp), - archive_recursion_depth, - config: PreprocConfig { cache: None, args }, - }, - &detection_reason, - ) - .with_context(|| { - format!( - "adapting {} via {} without caching failed", - filepath_hint.to_string_lossy(), - meta.name - ) - })?; - debug!( - "running adapter {} took {}", - adapter.metadata().name, - print_dur(start) - ); - Ok(oread) - /* }*/ - } + Some((adapter, detection_reason)) => run_adapter( + AdaptInfo { + filepath_hint, + is_real_file, + inp: Box::new(inp), + line_prefix, + config, + archive_recursion_depth, + }, + adapter, + detection_reason, + &filtered_adapters, + ), None => { // allow passthrough if the file is in an archive or accurate matching is enabled // otherwise it should have been filtered out by rg pre-glob since rg can handle those better than us - let allow_cat = !is_real_file || args.accurate; + let allow_cat = !is_real_file || config.accurate; if allow_cat { Ok(Box::new(inp)) } else { @@ -205,3 +87,141 @@ pub fn rga_preproc(ai: AdaptInfo) -> Result { } } } + +fn run_adapter( + ai: AdaptInfo, + adapter: Rc, + detection_reason: SlowMatcher, + filtered_adapters: &Vec>, +) -> Result { + let AdaptInfo { + filepath_hint, + is_real_file, + inp, + line_prefix, + config, + archive_recursion_depth, + .. + } = ai; + let meta = adapter.metadata(); + debug!( + "Chose adapter '{}' because of matcher {:?}", + &meta.name, &detection_reason + ); + eprintln!( + "{} adapter: {}", + filepath_hint.to_string_lossy(), + &meta.name + ); + let db_name = format!("{}.v{}", meta.name, meta.version); + let cache_compression_level = config.cache.compression_level; + let cache_max_blob_len = config.cache.max_blob_len; + + if let Some(mut cache) = LmdbCache::open(&config.cache)? { + let cache_key: Vec = { + let clean_path = filepath_hint.to_owned().clean(); + let meta = std::fs::metadata(&filepath_hint)?; + let modified = meta.modified().expect("weird OS that can't into mtime"); + + if adapter.metadata().recurses { + let key = ( + filtered_adapters + .iter() + .map(|a| (a.metadata().name.clone(), a.metadata().version)) + .collect::>(), + clean_path, + modified, + ); + debug!("Cache key (with recursion): {:?}", key); + bincode::serialize(&key).expect("could not serialize path") + } else { + let key = ( + adapter.metadata().name.clone(), + adapter.metadata().version, + clean_path, + modified, + ); + debug!("Cache key (no recursion): {:?}", key); + bincode::serialize(&key).expect("could not serialize path") + } + }; + // let dbg_ctx = format!("adapter {}", &adapter.metadata().name); + let cached = cache.get(&db_name, &cache_key)?; + match cached { + Some(cached) => Ok(Box::new( + zstd::stream::read::Decoder::new(std::io::Cursor::new(cached)) + .context("could not create zstd decoder")?, + )), + None => { + debug!("cache MISS, running adapter"); + debug!("adapting with caching..."); + let inp = adapter + .adapt( + AdaptInfo { + line_prefix, + filepath_hint: filepath_hint.clone(), + is_real_file, + inp: Box::new(inp), + archive_recursion_depth, + config, + }, + &detection_reason, + ) + .with_context(|| { + format!( + "adapting {} via {} failed", + filepath_hint.to_string_lossy(), + meta.name + ) + })?; + let inp = CachingReader::new( + inp, + cache_max_blob_len.0.try_into().unwrap(), + cache_compression_level.0.try_into().unwrap(), + Box::new(move |(uncompressed_size, compressed)| { + debug!( + "uncompressed output: {}", + print_bytes(uncompressed_size as f64) + ); + if let Some(cached) = compressed { + debug!("compressed output: {}", print_bytes(cached.len() as f64)); + cache.set(&db_name, &cache_key, &cached)? + } + Ok(()) + }), + )?; + + Ok(Box::new(inp)) + } + } + } else { + // no cache arc - probably within archive + debug!("adapting without caching..."); + let start = Instant::now(); + let oread = adapter + .adapt( + AdaptInfo { + line_prefix, + filepath_hint: filepath_hint.clone(), + is_real_file, + inp: Box::new(inp), + archive_recursion_depth, + config, + }, + &detection_reason, + ) + .with_context(|| { + format!( + "adapting {} via {} without caching failed", + filepath_hint.to_string_lossy(), + meta.name + ) + })?; + debug!( + "running adapter {} took {}", + adapter.metadata().name, + print_dur(start) + ); + Ok(oread) + } +} diff --git a/src/preproc_cache.rs b/src/preproc_cache.rs index b2cb13a..4f6e37c 100644 --- a/src/preproc_cache.rs +++ b/src/preproc_cache.rs @@ -1,37 +1,37 @@ -use crate::{print_bytes, print_dur, project_dirs}; +use crate::{config::CacheConfig, print_bytes, print_dur, project_dirs}; use anyhow::{format_err, Context, Result}; use log::*; use std::{ fmt::Display, + path::Path, sync::{Arc, RwLock}, time::Instant, }; -pub fn open() -> Result>> { - Ok(Arc::new(RwLock::new(LmdbCache::open()?))) -} pub trait PreprocCache: Send + Sync { - // possible without second lambda? + /*/// gets cache at specified key. + /// if cache hit, return the resulting data + /// else, run the given lambda, and store its result in the cache if present fn get_or_run<'a>( &mut self, db_name: &str, key: &[u8], - adapter_name: &str, + debug_name: &str, runner: Box Result>> + 'a>, - callback: Box Result<()> + 'a>, - ) -> Result<()>; + ) -> Result>>;*/ + + fn get(&self, db_name: &str, key: &[u8]) -> Result>>; + fn set(&mut self, db_name: &str, key: &[u8], value: &[u8]) -> Result<()>; } /// opens a LMDB cache -fn open_cache_db() -> Result>> { - let pd = project_dirs()?; - let app_cache = pd.cache_dir(); - std::fs::create_dir_all(app_cache)?; +fn open_cache_db(path: &Path) -> Result>> { + std::fs::create_dir_all(path)?; rkv::Manager::singleton() .write() .map_err(|_| format_err!("could not write cache db manager"))? - .get_or_create(app_cache, |p| { + .get_or_create(path, |p| { let mut builder = rkv::Rkv::environment_builder(); builder .set_flags(rkv::EnvironmentFlags::NO_SYNC | rkv::EnvironmentFlags::WRITE_MAP) // not durable cuz it's a cache @@ -53,10 +53,14 @@ pub struct LmdbCache { } impl LmdbCache { - pub fn open() -> Result { - Ok(LmdbCache { - db_arc: open_cache_db()?, - }) + pub fn open(config: &CacheConfig) -> Result> { + if config.disabled { + return Ok(None); + } + let path = Path::new(&config.path.0); + Ok(Some(LmdbCache { + db_arc: open_cache_db(&path)?, + })) } } @@ -70,27 +74,22 @@ impl Display for RkvErrWrap { impl std::error::Error for RkvErrWrap {} impl PreprocCache for LmdbCache { - // possible without second lambda? - fn get_or_run<'a>( - &mut self, - db_name: &str, - key: &[u8], - adapter_name: &str, - runner: Box Result>> + 'a>, - callback: Box Result<()> + 'a>, - ) -> Result<()> { + fn get(&self, db_name: &str, key: &[u8]) -> Result>> { let start = Instant::now(); - let db_env = self.db_arc.read().unwrap(); + let db_env = self + .db_arc + .read() + .map_err(|_| anyhow::anyhow!("Could not open lock, some lock writer panicked"))?; let db = db_env .open_single(db_name, rkv::store::Options::create()) .map_err(RkvErrWrap) - .with_context(|| format_err!("could not open cache db store"))?; + .context("could not open cache db store")?; let reader = db_env.read().expect("could not get reader"); let cached = db .get(&reader, &key) .map_err(RkvErrWrap) - .with_context(|| format_err!("could not read from db"))?; + .context("could not read from db")?; match cached { Some(rkv::Value::Blob(cached)) => { @@ -99,34 +98,38 @@ impl PreprocCache for LmdbCache { print_bytes(cached.len() as f64) ); debug!("reading from cache took {}", print_dur(start)); - callback(cached)?; + Ok(Some(Vec::from(cached))) } Some(_) => Err(format_err!("Integrity: value not blob"))?, - None => { - debug!("cache MISS, running adapter"); - drop(reader); - let runner_res = runner()?; - debug!("running adapter {} took {}", adapter_name, print_dur(start)); - let start = Instant::now(); - if let Some(got) = runner_res { - debug!("writing {} to cache", print_bytes(got.len() as f64)); - let mut writer = db_env - .write() - .map_err(RkvErrWrap) - .with_context(|| format_err!("could not open write handle to cache"))?; - db.put(&mut writer, &key, &rkv::Value::Blob(&got)) - .map_err(RkvErrWrap) - .with_context(|| format_err!("could not write to cache"))?; - writer - .commit() - .map_err(RkvErrWrap) - .with_context(|| format!("could not write cache"))?; - debug!("writing to cache took {}", print_dur(start)); - } else { - debug!("not caching output"); - } - } - }; + None => Ok(None), + } + } + fn set(&mut self, db_name: &str, key: &[u8], got: &[u8]) -> Result<()> { + let start = Instant::now(); + debug!("writing {} to cache", print_bytes(got.len() as f64)); + let db_env = self + .db_arc + .read() + .map_err(|_| anyhow::anyhow!("Could not open lock, some lock writer panicked"))?; + + let db = db_env + .open_single(db_name, rkv::store::Options::create()) + .map_err(RkvErrWrap) + .context("could not open cache db store")?; + + let mut writer = db_env + .write() + .map_err(RkvErrWrap) + .with_context(|| format_err!("could not open write handle to cache"))?; + + db.put(&mut writer, &key, &rkv::Value::Blob(&got)) + .map_err(RkvErrWrap) + .with_context(|| format_err!("could not write to cache"))?; + writer + .commit() + .map_err(RkvErrWrap) + .with_context(|| format!("could not write cache"))?; + debug!("writing to cache took {}", print_dur(start)); Ok(()) } } diff --git a/src/test_utils.rs b/src/test_utils.rs index 09f03b8..32983fd 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -1,12 +1,9 @@ use crate::{ adapters::{AdaptInfo, ReadBox}, - args::RgaConfig, + config::RgaConfig, matching::{FastMatcher, SlowMatcher}, - preproc::PreprocConfig, -}; -use std::{ - path::{Path, PathBuf}, }; +use std::path::{Path, PathBuf}; pub fn test_data_dir() -> PathBuf { let mut d = PathBuf::from(env!("CARGO_MANIFEST_DIR")); @@ -22,10 +19,7 @@ pub fn simple_adapt_info(filepath: &Path, inp: ReadBox) -> (AdaptInfo, SlowMatch archive_recursion_depth: 0, inp, line_prefix: "PREFIX:".to_string(), - config: PreprocConfig { - cache: None, - args: RgaConfig::default(), - }, + config: RgaConfig::default(), }, FastMatcher::FileExtension(filepath.extension().unwrap().to_string_lossy().into_owned()) .into(),