pass around config object

This commit is contained in:
phiresky 2019-06-07 19:00:24 +02:00
parent 957e06fc64
commit 9c5efa1970
11 changed files with 137 additions and 85 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
/target /target
/.idea /.idea
/.vscode/settings.json
**/*.rs.bk **/*.rs.bk

View File

@ -1,3 +0,0 @@
{
"editor.formatOnSave": true
}

View File

@ -5,6 +5,7 @@ pub mod spawning;
pub mod sqlite; pub mod sqlite;
pub mod tar; pub mod tar;
pub mod zip; pub mod zip;
use crate::preproc::PreprocConfig;
use failure::*; use failure::*;
use regex::{Regex, RegexSet}; use regex::{Regex, RegexSet};
use std::io::prelude::*; use std::io::prelude::*;
@ -56,6 +57,7 @@ pub struct AdaptInfo<'a> {
/// prefix every output line with this string to better indicate the file's location if it is in some archive /// prefix every output line with this string to better indicate the file's location if it is in some archive
pub line_prefix: &'a str, pub line_prefix: &'a str,
// pub adapt_subobject: &'a dyn Fn(AdaptInfo) -> Fallible<()>, // pub adapt_subobject: &'a dyn Fn(AdaptInfo) -> Fallible<()>,
pub config: &'a mut PreprocConfig,
} }
pub fn extension_to_regex(extension: &str) -> Regex { pub fn extension_to_regex(extension: &str) -> Regex {

View File

@ -58,6 +58,7 @@ impl FileAdapter for TarAdapter {
oup, oup,
line_prefix, line_prefix,
archive_recursion_depth, archive_recursion_depth,
config,
.. ..
} = ai; } = ai;
@ -81,8 +82,9 @@ impl FileAdapter for TarAdapter {
inp: &mut file, inp: &mut file,
oup, oup,
line_prefix, line_prefix,
config,
}; };
rga_preproc(ai2, None)?; rga_preproc(ai2)?;
} }
} }
Ok(()) Ok(())

View File

@ -50,6 +50,7 @@ impl FileAdapter for ZipAdapter {
oup, oup,
line_prefix, line_prefix,
archive_recursion_depth, archive_recursion_depth,
config,
.. ..
} = ai; } = ai;
loop { loop {
@ -67,17 +68,15 @@ impl FileAdapter for ZipAdapter {
file.compressed_size() file.compressed_size()
); );
let line_prefix = &format!("{}{}: ", line_prefix, file.name()); let line_prefix = &format!("{}{}: ", line_prefix, file.name());
rga_preproc( rga_preproc(AdaptInfo {
AdaptInfo {
filepath_hint: &file.sanitized_name(), filepath_hint: &file.sanitized_name(),
is_real_file: false, is_real_file: false,
inp: &mut file, inp: &mut file,
oup, oup,
line_prefix, line_prefix,
archive_recursion_depth, archive_recursion_depth: archive_recursion_depth + 1,
}, config,
None, })?;
)?;
} }
Err(e) => return Err(e.into()), Err(e) => return Err(e.into()),
} }

View File

@ -17,6 +17,10 @@ fn main() -> Result<(), Error> {
let i = File::open(&path)?; let i = File::open(&path)?;
let mut o = std::io::stdout(); let mut o = std::io::stdout();
let cache = match env::var("RGA_NO_CACHE") {
Ok(ref s) if s.len() > 0 => None,
Ok(_) | Err(_) => Some(rga::preproc_cache::open()?),
};
let ai = AdaptInfo { let ai = AdaptInfo {
inp: &mut BufReader::new(i), inp: &mut BufReader::new(i),
filepath_hint: &path, filepath_hint: &path,
@ -24,12 +28,8 @@ fn main() -> Result<(), Error> {
oup: &mut o, oup: &mut o,
line_prefix: "", line_prefix: "",
archive_recursion_depth: 0, archive_recursion_depth: 0,
config: &mut PreprocConfig { cache },
}; };
let cache_db = match env::var("RGA_NO_CACHE") { rga_preproc(ai)
Ok(ref s) if s.len() > 0 => None,
Ok(_) | Err(_) => Some(open_cache_db()?),
};
rga_preproc(ai, cache_db)
} }

View File

@ -3,7 +3,8 @@ use std::io::Write;
/** /**
* wrap a writer so that it is passthrough, * wrap a writer so that it is passthrough,
* but also the written data is compressed and written into a buffer, unless more than X bytes is written * but also the written data is compressed and written into a buffer,
* unless more than max_cache_size bytes is written, then the cache is dropped and it is pure passthrough.
*/ */
pub struct CachingWriter<W: Write> { pub struct CachingWriter<W: Write> {
max_cache_size: usize, max_cache_size: usize,

View File

View File

@ -2,6 +2,6 @@
pub mod adapters; pub mod adapters;
mod caching_writer; mod caching_writer;
pub mod errors;
pub mod preproc; pub mod preproc;
pub mod preproc_cache;
pub use caching_writer::CachingWriter; pub use caching_writer::CachingWriter;

View File

@ -1,47 +1,24 @@
use crate::adapters::*; use crate::adapters::*;
use crate::CachingWriter; use crate::CachingWriter;
use failure::Fallible;
use failure::{format_err, Error}; use failure::{format_err, Error};
use path_clean::PathClean; use path_clean::PathClean;
use std::convert::AsRef;
use std::io::BufWriter; use std::io::BufWriter;
// longest compressed conversion output to save in cache // longest compressed conversion output to save in cache
const MAX_DB_BLOB_LEN: usize = 2_000_000; const MAX_DB_BLOB_LEN: usize = 2_000_000;
const ZSTD_LEVEL: i32 = 12; const ZSTD_LEVEL: i32 = 12;
/// opens a LMDB cache pub struct PreprocConfig {
pub fn open_cache_db() -> Result<std::sync::Arc<std::sync::RwLock<rkv::Rkv>>, Error> { pub cache: Option<Box<dyn crate::preproc_cache::PreprocCache>>,
let app_cache = cachedir::CacheDirConfig::new("rga").get_cache_dir()?;
let db_arc = rkv::Manager::singleton()
.write()
.expect("could not write db manager")
.get_or_create(app_cache.as_path(), |p| {
let mut builder = rkv::Rkv::environment_builder();
builder
.set_flags(rkv::EnvironmentFlags::NO_SYNC | rkv::EnvironmentFlags::WRITE_MAP) // not durable cuz it's a cache
// i'm not sure why NO_TLS is needed. otherwise LMDB transactions (open readers) will keep piling up until it fails with
// LmdbError(ReadersFull)
// hope it doesn't break integrity
.set_flags(rkv::EnvironmentFlags::NO_TLS)
.set_map_size(2 * 1024 * 1024 * 1024)
.set_max_dbs(100)
.set_max_readers(128);
rkv::Rkv::from_env(p, builder)
})
.expect("could not get/create db");
Ok(db_arc)
} }
/** /**
* preprocess a file as defined in `ai`. * preprocess a file as defined in `ai`.
* *
* If a cache is passed, read/write to it. * If a cache is passed, read/write to it.
* *
*/ */
pub fn rga_preproc<'a>( pub fn rga_preproc(ai: AdaptInfo) -> Result<(), Error> {
ai: AdaptInfo<'a>,
mb_db_arc: Option<std::sync::Arc<std::sync::RwLock<rkv::Rkv>>>,
) -> Result<(), Error> {
let adapters = adapter_matcher()?; let adapters = adapter_matcher()?;
let AdaptInfo { let AdaptInfo {
filepath_hint, filepath_hint,
@ -49,6 +26,7 @@ pub fn rga_preproc<'a>(
inp, inp,
oup, oup,
line_prefix, line_prefix,
config,
.. ..
} = ai; } = ai;
let filename = filepath_hint let filename = filepath_hint
@ -71,10 +49,9 @@ pub fn rga_preproc<'a>(
let meta = ad.metadata(); let meta = ad.metadata();
eprintln!("adapter: {}", &meta.name); eprintln!("adapter: {}", &meta.name);
let db_name = format!("{}.v{}", meta.name, meta.version); let db_name = format!("{}.v{}", meta.name, meta.version);
if let Some(db_arc) = mb_db_arc { if let Some(cache) = config.cache.as_mut() {
let cache_key: Vec<u8> = { let cache_key: Vec<u8> = {
let clean_path = filepath_hint.to_owned().clean(); let clean_path = filepath_hint.to_owned().clean();
eprintln!("clean path: {:?}", clean_path);
let meta = std::fs::metadata(&filepath_hint)?; let meta = std::fs::metadata(&filepath_hint)?;
let key = ( let key = (
@ -85,24 +62,10 @@ pub fn rga_preproc<'a>(
bincode::serialize(&key).expect("could not serialize path") // key in the cache database bincode::serialize(&key).expect("could not serialize path") // key in the cache database
}; };
let db_env = db_arc.read().unwrap(); cache.get_or_run(
let db = db_env &db_name,
.open_single(db_name.as_str(), rkv::store::Options::create()) &cache_key,
.map_err(|p| format_err!("could not open db store: {:?}", p))?; Box::new(|| -> Fallible<Option<Vec<u8>>> {
let reader = db_env.read().expect("could not get reader");
let cached = db
.get(&reader, &cache_key)
.map_err(|p| format_err!("could not read from db: {:?}", p))?;
match cached {
Some(rkv::Value::Blob(cached)) => {
let stdouti = std::io::stdout();
zstd::stream::copy_decode(cached, stdouti.lock())?;
Ok(())
}
Some(_) => Err(format_err!("Integrity: value not blob")),
None => {
drop(reader);
// wrapping BufWriter here gives ~10% perf boost // wrapping BufWriter here gives ~10% perf boost
let mut compbuf = let mut compbuf =
BufWriter::new(CachingWriter::new(oup, MAX_DB_BLOB_LEN, ZSTD_LEVEL)?); BufWriter::new(CachingWriter::new(oup, MAX_DB_BLOB_LEN, ZSTD_LEVEL)?);
@ -114,6 +77,7 @@ pub fn rga_preproc<'a>(
inp, inp,
oup: &mut compbuf, oup: &mut compbuf,
archive_recursion_depth: 0, archive_recursion_depth: 0,
config: &mut PreprocConfig { cache: None },
})?; })?;
let compressed = compbuf let compressed = compbuf
.into_inner() .into_inner()
@ -122,21 +86,16 @@ pub fn rga_preproc<'a>(
.finish()?; .finish()?;
if let Some(cached) = compressed { if let Some(cached) = compressed {
eprintln!("compressed len: {}", cached.len()); eprintln!("compressed len: {}", cached.len());
};
{ Ok(None)
let mut writer = db_env.write().map_err(|p| { }),
format_err!("could not open write handle to cache: {:?}", p) Box::new(|cached| {
})?; let stdouti = std::io::stdout();
db.put(&mut writer, &cache_key, &rkv::Value::Blob(&cached)) zstd::stream::copy_decode(cached, stdouti.lock())?;
.map_err(|p| { Ok(())
format_err!("could not write to cache: {:?}", p) }),
})?; )?;
writer.commit().unwrap();
}
}
Ok(()) Ok(())
}
}
} else { } else {
eprintln!("adapting..."); eprintln!("adapting...");
ad.adapt(AdaptInfo { ad.adapt(AdaptInfo {
@ -146,6 +105,7 @@ pub fn rga_preproc<'a>(
inp, inp,
oup, oup,
archive_recursion_depth: 0, archive_recursion_depth: 0,
config: &mut PreprocConfig { cache: None },
})?; })?;
Ok(()) Ok(())
} }

90
src/preproc_cache.rs Normal file
View File

@ -0,0 +1,90 @@
use failure::{format_err, Fallible};
pub fn open() -> Fallible<Box<dyn PreprocCache>> {
Ok(Box::new(LmdbCache::open()?))
}
pub trait PreprocCache {
// possible without second lambda?
fn get_or_run<'a>(
&mut self,
db_name: &str,
key: &[u8],
runner: Box<dyn FnOnce() -> Fallible<Option<Vec<u8>>> + 'a>,
callback: Box<dyn FnOnce(&[u8]) -> Fallible<()> + 'a>,
) -> Fallible<()>;
}
/// opens a LMDB cache
fn open_cache_db() -> Fallible<std::sync::Arc<std::sync::RwLock<rkv::Rkv>>> {
let app_cache = cachedir::CacheDirConfig::new("rga").get_cache_dir()?;
let db_arc = rkv::Manager::singleton()
.write()
.expect("could not write db manager")
.get_or_create(app_cache.as_path(), |p| {
let mut builder = rkv::Rkv::environment_builder();
builder
.set_flags(rkv::EnvironmentFlags::NO_SYNC | rkv::EnvironmentFlags::WRITE_MAP) // not durable cuz it's a cache
// i'm not sure why NO_TLS is needed. otherwise LMDB transactions (open readers) will keep piling up until it fails with
// LmdbError(ReadersFull)
// hope it doesn't break integrity
.set_flags(rkv::EnvironmentFlags::NO_TLS)
.set_map_size(2 * 1024 * 1024 * 1024)
.set_max_dbs(100)
.set_max_readers(128);
rkv::Rkv::from_env(p, builder)
})
.expect("could not get/create db");
Ok(db_arc)
}
pub struct LmdbCache {
db_arc: std::sync::Arc<std::sync::RwLock<rkv::Rkv>>,
}
impl LmdbCache {
pub fn open() -> Fallible<LmdbCache> {
Ok(LmdbCache {
db_arc: open_cache_db()?,
})
}
}
impl PreprocCache for LmdbCache {
// possible without second lambda?
fn get_or_run<'a>(
&mut self,
db_name: &str,
key: &[u8],
runner: Box<dyn FnOnce() -> Fallible<Option<Vec<u8>>> + 'a>,
callback: Box<dyn FnOnce(&[u8]) -> Fallible<()> + 'a>,
) -> Fallible<()> {
let db_env = self.db_arc.read().unwrap();
let db = db_env
.open_single(db_name, rkv::store::Options::create())
.map_err(|p| format_err!("could not open db store: {:?}", p))?;
let reader = db_env.read().expect("could not get reader");
let cached = db
.get(&reader, &key)
.map_err(|p| format_err!("could not read from db: {:?}", p))?;
match cached {
Some(rkv::Value::Blob(cached)) => {
callback(cached)?;
}
Some(_) => Err(format_err!("Integrity: value not blob"))?,
None => {
drop(reader);
if let Some(got) = runner()? {
let mut writer = db_env.write().map_err(|p| {
format_err!("could not open write handle to cache: {:?}", p)
})?;
db.put(&mut writer, &key, &rkv::Value::Blob(&got))
.map_err(|p| format_err!("could not write to cache: {:?}", p))?;
writer.commit()?;
}
}
};
Ok(())
}
}