diff --git a/.gitignore b/.gitignore index 97bec05..503de86 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target /.idea +/.vscode/settings.json **/*.rs.bk diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 23fd35f..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "editor.formatOnSave": true -} \ No newline at end of file diff --git a/src/adapters/mod.rs b/src/adapters/mod.rs index 59b0069..219a8d4 100644 --- a/src/adapters/mod.rs +++ b/src/adapters/mod.rs @@ -5,6 +5,7 @@ pub mod spawning; pub mod sqlite; pub mod tar; pub mod zip; +use crate::preproc::PreprocConfig; use failure::*; use regex::{Regex, RegexSet}; use std::io::prelude::*; @@ -56,6 +57,7 @@ pub struct AdaptInfo<'a> { /// prefix every output line with this string to better indicate the file's location if it is in some archive pub line_prefix: &'a str, // pub adapt_subobject: &'a dyn Fn(AdaptInfo) -> Fallible<()>, + pub config: &'a mut PreprocConfig, } pub fn extension_to_regex(extension: &str) -> Regex { diff --git a/src/adapters/tar.rs b/src/adapters/tar.rs index 08938e5..ff808c9 100644 --- a/src/adapters/tar.rs +++ b/src/adapters/tar.rs @@ -58,6 +58,7 @@ impl FileAdapter for TarAdapter { oup, line_prefix, archive_recursion_depth, + config, .. } = ai; @@ -81,8 +82,9 @@ impl FileAdapter for TarAdapter { inp: &mut file, oup, line_prefix, + config, }; - rga_preproc(ai2, None)?; + rga_preproc(ai2)?; } } Ok(()) diff --git a/src/adapters/zip.rs b/src/adapters/zip.rs index 634826e..38a505c 100644 --- a/src/adapters/zip.rs +++ b/src/adapters/zip.rs @@ -50,6 +50,7 @@ impl FileAdapter for ZipAdapter { oup, line_prefix, archive_recursion_depth, + config, .. } = ai; loop { @@ -67,17 +68,15 @@ impl FileAdapter for ZipAdapter { file.compressed_size() ); let line_prefix = &format!("{}{}: ", line_prefix, file.name()); - rga_preproc( - AdaptInfo { - filepath_hint: &file.sanitized_name(), - is_real_file: false, - inp: &mut file, - oup, - line_prefix, - archive_recursion_depth, - }, - None, - )?; + rga_preproc(AdaptInfo { + filepath_hint: &file.sanitized_name(), + is_real_file: false, + inp: &mut file, + oup, + line_prefix, + archive_recursion_depth: archive_recursion_depth + 1, + config, + })?; } Err(e) => return Err(e.into()), } diff --git a/src/bin/rga-preproc.rs b/src/bin/rga-preproc.rs index a601a9f..1026a9c 100644 --- a/src/bin/rga-preproc.rs +++ b/src/bin/rga-preproc.rs @@ -17,6 +17,10 @@ fn main() -> Result<(), Error> { let i = File::open(&path)?; let mut o = std::io::stdout(); + let cache = match env::var("RGA_NO_CACHE") { + Ok(ref s) if s.len() > 0 => None, + Ok(_) | Err(_) => Some(rga::preproc_cache::open()?), + }; let ai = AdaptInfo { inp: &mut BufReader::new(i), filepath_hint: &path, @@ -24,12 +28,8 @@ fn main() -> Result<(), Error> { oup: &mut o, line_prefix: "", archive_recursion_depth: 0, + config: &mut PreprocConfig { cache }, }; - let cache_db = match env::var("RGA_NO_CACHE") { - Ok(ref s) if s.len() > 0 => None, - Ok(_) | Err(_) => Some(open_cache_db()?), - }; - - rga_preproc(ai, cache_db) + rga_preproc(ai) } diff --git a/src/caching_writer.rs b/src/caching_writer.rs index 8b61f8f..8f5bb8a 100644 --- a/src/caching_writer.rs +++ b/src/caching_writer.rs @@ -3,7 +3,8 @@ use std::io::Write; /** * wrap a writer so that it is passthrough, - * but also the written data is compressed and written into a buffer, unless more than X bytes is written + * but also the written data is compressed and written into a buffer, + * unless more than max_cache_size bytes is written, then the cache is dropped and it is pure passthrough. */ pub struct CachingWriter { max_cache_size: usize, diff --git a/src/errors.rs b/src/errors.rs deleted file mode 100644 index e69de29..0000000 diff --git a/src/lib.rs b/src/lib.rs index f01449b..6bb280c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,6 @@ pub mod adapters; mod caching_writer; -pub mod errors; pub mod preproc; +pub mod preproc_cache; pub use caching_writer::CachingWriter; diff --git a/src/preproc.rs b/src/preproc.rs index 397f662..9bbdb87 100644 --- a/src/preproc.rs +++ b/src/preproc.rs @@ -1,47 +1,24 @@ use crate::adapters::*; use crate::CachingWriter; +use failure::Fallible; use failure::{format_err, Error}; use path_clean::PathClean; +use std::convert::AsRef; use std::io::BufWriter; - // longest compressed conversion output to save in cache const MAX_DB_BLOB_LEN: usize = 2_000_000; const ZSTD_LEVEL: i32 = 12; -/// opens a LMDB cache -pub fn open_cache_db() -> Result>, Error> { - let app_cache = cachedir::CacheDirConfig::new("rga").get_cache_dir()?; - - let db_arc = rkv::Manager::singleton() - .write() - .expect("could not write db manager") - .get_or_create(app_cache.as_path(), |p| { - let mut builder = rkv::Rkv::environment_builder(); - builder - .set_flags(rkv::EnvironmentFlags::NO_SYNC | rkv::EnvironmentFlags::WRITE_MAP) // not durable cuz it's a cache - // i'm not sure why NO_TLS is needed. otherwise LMDB transactions (open readers) will keep piling up until it fails with - // LmdbError(ReadersFull) - // hope it doesn't break integrity - .set_flags(rkv::EnvironmentFlags::NO_TLS) - .set_map_size(2 * 1024 * 1024 * 1024) - .set_max_dbs(100) - .set_max_readers(128); - rkv::Rkv::from_env(p, builder) - }) - .expect("could not get/create db"); - Ok(db_arc) +pub struct PreprocConfig { + pub cache: Option>, } - /** * preprocess a file as defined in `ai`. * * If a cache is passed, read/write to it. * */ -pub fn rga_preproc<'a>( - ai: AdaptInfo<'a>, - mb_db_arc: Option>>, -) -> Result<(), Error> { +pub fn rga_preproc(ai: AdaptInfo) -> Result<(), Error> { let adapters = adapter_matcher()?; let AdaptInfo { filepath_hint, @@ -49,6 +26,7 @@ pub fn rga_preproc<'a>( inp, oup, line_prefix, + config, .. } = ai; let filename = filepath_hint @@ -71,10 +49,9 @@ pub fn rga_preproc<'a>( let meta = ad.metadata(); eprintln!("adapter: {}", &meta.name); let db_name = format!("{}.v{}", meta.name, meta.version); - if let Some(db_arc) = mb_db_arc { + if let Some(cache) = config.cache.as_mut() { let cache_key: Vec = { let clean_path = filepath_hint.to_owned().clean(); - eprintln!("clean path: {:?}", clean_path); let meta = std::fs::metadata(&filepath_hint)?; let key = ( @@ -85,24 +62,10 @@ pub fn rga_preproc<'a>( bincode::serialize(&key).expect("could not serialize path") // key in the cache database }; - let db_env = db_arc.read().unwrap(); - let db = db_env - .open_single(db_name.as_str(), rkv::store::Options::create()) - .map_err(|p| format_err!("could not open db store: {:?}", p))?; - - let reader = db_env.read().expect("could not get reader"); - let cached = db - .get(&reader, &cache_key) - .map_err(|p| format_err!("could not read from db: {:?}", p))?; - match cached { - Some(rkv::Value::Blob(cached)) => { - let stdouti = std::io::stdout(); - zstd::stream::copy_decode(cached, stdouti.lock())?; - Ok(()) - } - Some(_) => Err(format_err!("Integrity: value not blob")), - None => { - drop(reader); + cache.get_or_run( + &db_name, + &cache_key, + Box::new(|| -> Fallible>> { // wrapping BufWriter here gives ~10% perf boost let mut compbuf = BufWriter::new(CachingWriter::new(oup, MAX_DB_BLOB_LEN, ZSTD_LEVEL)?); @@ -114,6 +77,7 @@ pub fn rga_preproc<'a>( inp, oup: &mut compbuf, archive_recursion_depth: 0, + config: &mut PreprocConfig { cache: None }, })?; let compressed = compbuf .into_inner() @@ -122,21 +86,16 @@ pub fn rga_preproc<'a>( .finish()?; if let Some(cached) = compressed { eprintln!("compressed len: {}", cached.len()); - - { - let mut writer = db_env.write().map_err(|p| { - format_err!("could not open write handle to cache: {:?}", p) - })?; - db.put(&mut writer, &cache_key, &rkv::Value::Blob(&cached)) - .map_err(|p| { - format_err!("could not write to cache: {:?}", p) - })?; - writer.commit().unwrap(); - } - } + }; + Ok(None) + }), + Box::new(|cached| { + let stdouti = std::io::stdout(); + zstd::stream::copy_decode(cached, stdouti.lock())?; Ok(()) - } - } + }), + )?; + Ok(()) } else { eprintln!("adapting..."); ad.adapt(AdaptInfo { @@ -146,6 +105,7 @@ pub fn rga_preproc<'a>( inp, oup, archive_recursion_depth: 0, + config: &mut PreprocConfig { cache: None }, })?; Ok(()) } diff --git a/src/preproc_cache.rs b/src/preproc_cache.rs new file mode 100644 index 0000000..f041ac6 --- /dev/null +++ b/src/preproc_cache.rs @@ -0,0 +1,90 @@ +use failure::{format_err, Fallible}; + +pub fn open() -> Fallible> { + Ok(Box::new(LmdbCache::open()?)) +} +pub trait PreprocCache { + // possible without second lambda? + fn get_or_run<'a>( + &mut self, + db_name: &str, + key: &[u8], + runner: Box Fallible>> + 'a>, + callback: Box Fallible<()> + 'a>, + ) -> Fallible<()>; +} + +/// opens a LMDB cache +fn open_cache_db() -> Fallible>> { + let app_cache = cachedir::CacheDirConfig::new("rga").get_cache_dir()?; + + let db_arc = rkv::Manager::singleton() + .write() + .expect("could not write db manager") + .get_or_create(app_cache.as_path(), |p| { + let mut builder = rkv::Rkv::environment_builder(); + builder + .set_flags(rkv::EnvironmentFlags::NO_SYNC | rkv::EnvironmentFlags::WRITE_MAP) // not durable cuz it's a cache + // i'm not sure why NO_TLS is needed. otherwise LMDB transactions (open readers) will keep piling up until it fails with + // LmdbError(ReadersFull) + // hope it doesn't break integrity + .set_flags(rkv::EnvironmentFlags::NO_TLS) + .set_map_size(2 * 1024 * 1024 * 1024) + .set_max_dbs(100) + .set_max_readers(128); + rkv::Rkv::from_env(p, builder) + }) + .expect("could not get/create db"); + Ok(db_arc) +} + +pub struct LmdbCache { + db_arc: std::sync::Arc>, +} + +impl LmdbCache { + pub fn open() -> Fallible { + Ok(LmdbCache { + db_arc: open_cache_db()?, + }) + } +} +impl PreprocCache for LmdbCache { + // possible without second lambda? + fn get_or_run<'a>( + &mut self, + db_name: &str, + key: &[u8], + runner: Box Fallible>> + 'a>, + callback: Box Fallible<()> + 'a>, + ) -> Fallible<()> { + let db_env = self.db_arc.read().unwrap(); + let db = db_env + .open_single(db_name, rkv::store::Options::create()) + .map_err(|p| format_err!("could not open db store: {:?}", p))?; + + let reader = db_env.read().expect("could not get reader"); + let cached = db + .get(&reader, &key) + .map_err(|p| format_err!("could not read from db: {:?}", p))?; + + match cached { + Some(rkv::Value::Blob(cached)) => { + callback(cached)?; + } + Some(_) => Err(format_err!("Integrity: value not blob"))?, + None => { + drop(reader); + if let Some(got) = runner()? { + let mut writer = db_env.write().map_err(|p| { + format_err!("could not open write handle to cache: {:?}", p) + })?; + db.put(&mut writer, &key, &rkv::Value::Blob(&got)) + .map_err(|p| format_err!("could not write to cache: {:?}", p))?; + writer.commit()?; + } + } + }; + Ok(()) + } +}