continue refactoring

This commit is contained in:
phiresky 2020-06-17 11:43:47 +02:00
parent 8b4a7ab38c
commit 0895e7a6cf
10 changed files with 370 additions and 304 deletions

View File

@ -4,7 +4,8 @@
- add loads of debug logs and performance timings when `--debug` is used - add loads of debug logs and performance timings when `--debug` is used
- better error messages via `anyhow` - better error messages via `anyhow`
- add cross-platform rga-fzf binary - add cross-platform rga-fzf binary
- add a config file including schema - add a config file (~/.config/ripgrep-all) that is generated on first use, including schema
- change adapter interface from `(&Read, &Write) -> ()` to `Read -> Read` to allow chaining of adapters
# 0.9.6 (2020-05-19) # 0.9.6 (2020-05-19)

View File

@ -10,8 +10,7 @@ pub mod sqlite;
//pub mod tesseract; //pub mod tesseract;
pub mod writing; pub mod writing;
// pub mod zip; // pub mod zip;
use crate::matching::*; use crate::{config::RgaConfig, matching::*};
use crate::preproc::PreprocConfig;
use anyhow::*; use anyhow::*;
use custom::builtin_spawning_adapters; use custom::builtin_spawning_adapters;
use custom::CustomAdapterConfig; use custom::CustomAdapterConfig;
@ -79,7 +78,7 @@ pub struct AdaptInfo {
pub inp: ReadBox, pub inp: ReadBox,
/// prefix every output line with this string to better indicate the file's location if it is in some archive /// prefix every output line with this string to better indicate the file's location if it is in some archive
pub line_prefix: String, pub line_prefix: String,
pub config: PreprocConfig, pub config: RgaConfig,
} }
/// (enabledAdapters, disabledAdapters) /// (enabledAdapters, disabledAdapters)

View File

@ -1,15 +1,17 @@
use rga::adapters::*; use rga::adapters::*;
use rga::preproc::*; use rga::preproc::*;
use rga::print_dur;
use ripgrep_all as rga; use ripgrep_all as rga;
use anyhow::Context; use anyhow::Context;
use std::fs::File; use log::debug;
use std::{fs::File, time::Instant};
fn main() -> anyhow::Result<()> { fn main() -> anyhow::Result<()> {
env_logger::init(); env_logger::init();
let mut arg_arr: Vec<std::ffi::OsString> = std::env::args_os().collect(); let mut arg_arr: Vec<std::ffi::OsString> = std::env::args_os().collect();
let last = arg_arr.pop().expect("No filename specified"); let last = arg_arr.pop().expect("No filename specified");
let args = rga::args::parse_args(arg_arr, true)?; let config = rga::config::parse_args(arg_arr, true)?;
//clap::App::new("rga-preproc").arg(Arg::from_usage()) //clap::App::new("rga-preproc").arg(Arg::from_usage())
let path = { let path = {
let filepath = last; let filepath = last;
@ -18,20 +20,19 @@ fn main() -> anyhow::Result<()> {
let i = File::open(&path).context("Specified input file not found")?; let i = File::open(&path).context("Specified input file not found")?;
let mut o = std::io::stdout(); let mut o = std::io::stdout();
let cache = if args.no_cache {
None
} else {
Some(rga::preproc_cache::open().context("could not open cache")?)
};
let ai = AdaptInfo { let ai = AdaptInfo {
inp: Box::new(i), inp: Box::new(i),
filepath_hint: path, filepath_hint: path,
is_real_file: true, is_real_file: true,
line_prefix: "".to_string(), line_prefix: "".to_string(),
archive_recursion_depth: 0, archive_recursion_depth: 0,
config: PreprocConfig { cache, args }, config,
}; };
let start = Instant::now();
let mut oup = rga_preproc(ai).context("during preprocessing")?; let mut oup = rga_preproc(ai).context("during preprocessing")?;
debug!("finding and starting adapter took {}", print_dur(start));
std::io::copy(&mut oup, &mut o).context("copying adapter output to stdout")?; std::io::copy(&mut oup, &mut o).context("copying adapter output to stdout")?;
debug!("running adapter took {} total", print_dur(start));
Ok(()) Ok(())
} }

View File

@ -1,7 +1,7 @@
use anyhow::Result; use anyhow::Result;
use rga::adapters::spawning::map_exe_error; use rga::adapters::spawning::map_exe_error;
use rga::adapters::*; use rga::adapters::*;
use rga::args::*; use rga::config::{split_args, RgaConfig};
use rga::matching::*; use rga::matching::*;
use rga::print_dur; use rga::print_dur;
use ripgrep_all as rga; use ripgrep_all as rga;

View File

@ -1,32 +1,39 @@
use anyhow::Result; use anyhow::Result;
use log::*; use log::*;
use std::io::Write; use std::io::{BufReader, Read, Write};
/** /**
* wrap a writer so that it is passthrough, * wrap a writer so that it is passthrough,
* but also the written data is compressed and written into a buffer, * but also the written data is compressed and written into a buffer,
* unless more than max_cache_size bytes is written, then the cache is dropped and it is pure passthrough. * unless more than max_cache_size bytes is written, then the cache is dropped and it is pure passthrough.
*/ */
pub struct CachingWriter<W: Write> { pub struct CachingReader<R: Read> {
max_cache_size: usize, max_cache_size: usize,
zstd_writer: Option<zstd::stream::write::Encoder<Vec<u8>>>, zstd_writer: Option<zstd::stream::write::Encoder<Vec<u8>>>,
out: W, inp: R,
bytes_written: u64, bytes_written: u64,
on_finish: Box<dyn FnOnce((u64, Option<Vec<u8>>)) -> Result<()> + Send>,
} }
impl<W: Write> CachingWriter<W> { impl<R: Read> CachingReader<R> {
pub fn new(out: W, max_cache_size: usize, compression_level: i32) -> Result<CachingWriter<W>> { pub fn new(
Ok(CachingWriter { inp: R,
out, max_cache_size: usize,
compression_level: i32,
on_finish: Box<dyn FnOnce((u64, Option<Vec<u8>>)) -> Result<()> + Send>,
) -> Result<CachingReader<R>> {
Ok(CachingReader {
inp,
max_cache_size, max_cache_size,
zstd_writer: Some(zstd::stream::write::Encoder::new( zstd_writer: Some(zstd::stream::write::Encoder::new(
Vec::new(), Vec::new(),
compression_level, compression_level,
)?), )?),
bytes_written: 0, bytes_written: 0,
on_finish,
}) })
} }
pub fn finish(self) -> std::io::Result<(u64, Option<Vec<u8>>)> { pub fn finish(&mut self) -> std::io::Result<(u64, Option<Vec<u8>>)> {
if let Some(writer) = self.zstd_writer { if let Some(writer) = self.zstd_writer.take() {
let res = writer.finish()?; let res = writer.finish()?;
if res.len() <= self.max_cache_size { if res.len() <= self.max_cache_size {
return Ok((self.bytes_written, Some(res))); return Ok((self.bytes_written, Some(res)));
@ -34,32 +41,37 @@ impl<W: Write> CachingWriter<W> {
} }
Ok((self.bytes_written, None)) Ok((self.bytes_written, None))
} }
} fn write_to_compressed(&mut self, buf: &[u8]) -> std::io::Result<()> {
impl<W: Write> Write for CachingWriter<W> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let written_bytes = match self.zstd_writer.as_mut() {
Some(writer) => {
let wrote = writer.write(buf)?;
let compressed_len = writer.get_ref().len();
trace!("wrote {} to zstd, len now {}", wrote, compressed_len);
if compressed_len > self.max_cache_size {
debug!("cache longer than max, dropping");
//writer.finish();
self.zstd_writer.take().unwrap().finish()?;
}
self.out.write_all(&buf[0..wrote])?;
Ok(wrote)
}
None => self.out.write(buf),
}?;
self.bytes_written += written_bytes as u64;
Ok(written_bytes)
}
fn flush(&mut self) -> std::io::Result<()> {
debug!("flushing");
if let Some(writer) = self.zstd_writer.as_mut() { if let Some(writer) = self.zstd_writer.as_mut() {
writer.flush()?; let wrote = writer.write(buf)?;
let compressed_len = writer.get_ref().len();
trace!("wrote {} to zstd, len now {}", wrote, compressed_len);
if compressed_len > self.max_cache_size {
debug!("cache longer than max, dropping");
//writer.finish();
self.zstd_writer.take().unwrap().finish()?;
}
}
Ok(())
}
}
impl<R: Read> Read for CachingReader<R> {
fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
match self.inp.read(&mut buf) {
Ok(0) => {
// move out of box, replace with noop lambda
let on_finish = std::mem::replace(&mut self.on_finish, Box::new(|_| Ok(())));
// EOF, finish!
(on_finish)(self.finish()?)
.map(|()| 0)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
}
Ok(read_bytes) => {
self.write_to_compressed(&buf[0..read_bytes])?;
self.bytes_written += read_bytes as u64;
Ok(read_bytes)
}
Err(e) => Err(e),
} }
self.out.flush()
} }
} }

View File

@ -41,7 +41,23 @@ impl Default for MaxArchiveRecursion {
} }
} }
#[derive(JsonSchema, Debug, Serialize, Deserialize, Copy, Clone, PartialEq)] #[derive(JsonSchema, Debug, Serialize, Deserialize, Clone, PartialEq, FromStr)]
pub struct CachePath(pub String);
impl ToString for CachePath {
fn to_string(&self) -> String {
self.0.to_string()
}
}
impl Default for CachePath {
fn default() -> Self {
let pd = project_dirs().expect("could not get cache path");
let app_cache = pd.cache_dir();
CachePath(app_cache.to_str().expect("cache path not utf8").to_owned())
}
}
#[derive(JsonSchema, Debug, Serialize, Deserialize, Copy, Clone, PartialEq, Eq)]
pub struct CacheMaxBlobLen(pub usize); pub struct CacheMaxBlobLen(pub usize);
impl ToString for CacheMaxBlobLen { impl ToString for CacheMaxBlobLen {
@ -96,18 +112,6 @@ impl FromStr for CacheMaxBlobLen {
usage = "rga [RGA OPTIONS] [RG OPTIONS] PATTERN [PATH ...]" usage = "rga [RGA OPTIONS] [RG OPTIONS] PATTERN [PATH ...]"
)] )]
pub struct RgaConfig { pub struct RgaConfig {
/// Disable caching of results
///
/// By default, rga caches the extracted text, if it is small enough,
/// to a database in ~/.cache/rga on Linux,
/// ~/Library/Caches/rga on macOS,
/// or C:\Users\username\AppData\Local\rga on Windows.
/// This way, repeated searches on the same set of files will be much faster.
/// If you pass this flag, all caching will be disabled.
#[serde(default, skip_serializing_if = "is_default")]
#[structopt(long = "--rga-no-cache")]
pub no_cache: bool,
/// Use more accurate but slower matching by mime type /// Use more accurate but slower matching by mime type
/// ///
/// By default, rga will match files using file extensions. /// By default, rga will match files using file extensions.
@ -133,31 +137,9 @@ pub struct RgaConfig {
)] )]
pub adapters: Vec<String>, pub adapters: Vec<String>,
/// Max compressed size to cache
///
/// Longest byte length (after compression) to store in cache. Longer adapter outputs will not be cached and recomputed every time. Allowed suffixes: k M G
#[serde(default, skip_serializing_if = "is_default")] #[serde(default, skip_serializing_if = "is_default")]
#[structopt( #[structopt(flatten)]
default_value, pub cache: CacheConfig,
long = "--rga-cache-max-blob-len",
hidden_short_help = true,
require_equals = true,
// parse(try_from_str = parse_readable_bytes_str)
)]
pub cache_max_blob_len: CacheMaxBlobLen,
/// ZSTD compression level to apply to adapter outputs before storing in cache db
///
/// Ranges from 1 - 22
#[serde(default, skip_serializing_if = "is_default")]
#[structopt(
default_value,
long = "--rga-cache-compression-level",
hidden_short_help = true,
require_equals = true,
help = ""
)]
pub cache_compression_level: CacheCompressionLevel,
/// Maximum nestedness of archives to recurse into /// Maximum nestedness of archives to recurse into
#[serde(default, skip_serializing_if = "is_default")] #[serde(default, skip_serializing_if = "is_default")]
@ -210,6 +192,60 @@ pub struct RgaConfig {
pub rg_version: bool, pub rg_version: bool,
} }
#[derive(StructOpt, Debug, Deserialize, Serialize, JsonSchema, Default, Clone, PartialEq)]
pub struct CacheConfig {
/// Disable caching of results
///
/// By default, rga caches the extracted text, if it is small enough,
/// to a database in ~/.cache/rga on Linux,
/// ~/Library/Caches/rga on macOS,
/// or C:\Users\username\AppData\Local\rga on Windows.
/// This way, repeated searches on the same set of files will be much faster.
/// If you pass this flag, all caching will be disabled.
#[serde(default, skip_serializing_if = "is_default")]
#[structopt(long = "--rga-no-cache")]
pub disabled: bool,
/// Max compressed size to cache
///
/// Longest byte length (after compression) to store in cache. Longer adapter outputs will not be cached and recomputed every time. Allowed suffixes: k M G
#[serde(default, skip_serializing_if = "is_default")]
#[structopt(
default_value,
long = "--rga-cache-max-blob-len",
hidden_short_help = true,
require_equals = true,
// parse(try_from_str = parse_readable_bytes_str)
)]
pub max_blob_len: CacheMaxBlobLen,
/// ZSTD compression level to apply to adapter outputs before storing in cache db
///
/// Ranges from 1 - 22
#[serde(default, skip_serializing_if = "is_default")]
#[structopt(
default_value,
long = "--rga-cache-compression-level",
hidden_short_help = true,
require_equals = true,
help = ""
)]
pub compression_level: CacheCompressionLevel,
/// ZSTD compression level to apply to adapter outputs before storing in cache db
///
/// Ranges from 1 - 22
#[serde(default, skip_serializing_if = "is_default")]
#[structopt(
default_value,
long = "--rga-cache-path",
hidden_short_help = true,
require_equals = true,
help = ""
)]
pub path: CachePath,
}
static RGA_CONFIG: &str = "RGA_CONFIG"; static RGA_CONFIG: &str = "RGA_CONFIG";
use serde_json::Value; use serde_json::Value;

View File

@ -3,8 +3,8 @@
#![feature(specialization)] #![feature(specialization)]
pub mod adapters; pub mod adapters;
pub mod args;
mod caching_writer; mod caching_writer;
pub mod config;
pub mod matching; pub mod matching;
pub mod pipe; pub mod pipe;
pub mod preproc; pub mod preproc;
@ -13,7 +13,7 @@ pub mod preproc_cache;
pub mod test_utils; pub mod test_utils;
use anyhow::Context; use anyhow::Context;
use anyhow::Result; use anyhow::Result;
pub use caching_writer::CachingWriter; pub use caching_writer::CachingReader;
use directories_next::ProjectDirs; use directories_next::ProjectDirs;
use std::time::Instant; use std::time::Instant;

View File

@ -1,24 +1,18 @@
use crate::adapters::*; use crate::adapters::*;
use crate::args::RgaConfig;
use crate::matching::*; use crate::matching::*;
use crate::{print_bytes, print_dur, CachingWriter}; use crate::{
config::RgaConfig,
preproc_cache::{LmdbCache, PreprocCache},
print_bytes, print_dur, CachingReader,
};
use anyhow::*; use anyhow::*;
use log::*; use log::*;
use path_clean::PathClean;
use std::convert::TryInto;
use std::io::{BufRead, BufReader};
use std::{path::PathBuf, rc::Rc, time::Instant};
use std::io::BufReader;
use std::{
sync::{Arc, RwLock},
time::Instant,
};
#[derive(Clone)]
pub struct PreprocConfig {
pub cache: Option<Arc<RwLock<dyn crate::preproc_cache::PreprocCache>>>,
pub args: RgaConfig,
}
/** /**
* preprocess a file as defined in `ai`. * preprocess a file as defined in `ai`.
* *
@ -36,164 +30,52 @@ pub fn rga_preproc(ai: AdaptInfo) -> Result<ReadBox> {
.. ..
} = ai; } = ai;
debug!("path (hint) to preprocess: {:?}", filepath_hint); debug!("path (hint) to preprocess: {:?}", filepath_hint);
let PreprocConfig { cache: _, args } = config; let filtered_adapters =
let filtered_adapters = get_adapters_filtered(args.custom_adapters.clone(), &args.adapters)?; get_adapters_filtered(config.custom_adapters.clone(), &config.adapters)?;
let adapters = adapter_matcher(&filtered_adapters, args.accurate)?; let adapters = adapter_matcher(&filtered_adapters, config.accurate)?;
let filename = filepath_hint let filename = filepath_hint
.file_name() .file_name()
.ok_or_else(|| format_err!("Empty filename"))?; .ok_or_else(|| format_err!("Empty filename"))?;
debug!("Archive recursion depth: {}", archive_recursion_depth); debug!("Archive recursion depth: {}", archive_recursion_depth);
if archive_recursion_depth >= args.max_archive_recursion.0 { if archive_recursion_depth >= config.max_archive_recursion.0 {
let s = format!("{}[rga: max archive recursion reached]", line_prefix).into_bytes(); let s = format!("{}[rga: max archive recursion reached]", line_prefix).into_bytes();
return Ok(Box::new(std::io::Cursor::new(s))); return Ok(Box::new(std::io::Cursor::new(s)));
} }
// todo: figure out when using a bufreader is a good idea and when it is not // todo: figure out when using a bufreader is a good idea and when it is not
// seems to be good for File::open() reads, but not sure about within archives (tar, zip) // seems to be good for File::open() reads, but not sure about within archives (tar, zip)
let inp = BufReader::with_capacity(1 << 16, inp); let mut inp = BufReader::with_capacity(1 << 16, inp);
let mimetype = None; /*if args.accurate { let mimetype = if config.accurate {
let buf = inp.fill_buf()?; // fill but do not consume! let buf = inp.fill_buf()?; // fill but do not consume!
let mimetype = tree_magic::from_u8(buf); let mimetype = tree_magic::from_u8(buf);
debug!("mimetype: {:?}", mimetype); debug!("mimetype: {:?}", mimetype);
Some(mimetype) Some(mimetype)
} else { } else {
None None
};*/ };
let adapter = adapters(FileMeta { let adapter = adapters(FileMeta {
mimetype, mimetype,
lossy_filename: filename.to_string_lossy().to_string(), lossy_filename: filename.to_string_lossy().to_string(),
}); });
match adapter { match adapter {
Some((adapter, detection_reason)) => { Some((adapter, detection_reason)) => run_adapter(
let meta = adapter.metadata(); AdaptInfo {
debug!( filepath_hint,
"Chose adapter '{}' because of matcher {:?}", is_real_file,
&meta.name, &detection_reason inp: Box::new(inp),
); line_prefix,
eprintln!( config,
"{} adapter: {}", archive_recursion_depth,
filepath_hint.to_string_lossy(), },
&meta.name adapter,
); detection_reason,
let _db_name = format!("{}.v{}", meta.name, meta.version); &filtered_adapters,
/*if let Some(cache) = cache.as_mut() { ),
let cache_key: Vec<u8> = {
let clean_path = filepath_hint.to_owned().clean();
let meta = std::fs::metadata(&filepath_hint)?;
let modified = meta.modified().expect("weird OS that can't into mtime");
if adapter.metadata().recurses {
let key = (
filtered_adapters
.iter()
.map(|a| (a.metadata().name.clone(), a.metadata().version))
.collect::<Vec<_>>(),
clean_path,
modified,
);
debug!("Cache key (with recursion): {:?}", key);
bincode::serialize(&key).expect("could not serialize path")
} else {
let key = (
adapter.metadata().name.clone(),
adapter.metadata().version,
clean_path,
modified,
);
debug!("Cache key (no recursion): {:?}", key);
bincode::serialize(&key).expect("could not serialize path")
}
};
cache.write().unwrap().get_or_run(
&db_name,
&cache_key,
&adapter.metadata().name,
Box::new(|| -> Result<Option<Vec<u8>>> {
// wrapping BufWriter here gives ~10% perf boost
let mut compbuf = BufWriter::new(CachingWriter::new(
oup,
args.cache_max_blob_len.0.try_into().unwrap(),
args.cache_compression_level.0.try_into().unwrap(),
)?);
debug!("adapting with caching...");
adapter
.adapt(
AdaptInfo {
line_prefix,
filepath_hint,
is_real_file,
inp,
oup: &mut compbuf,
archive_recursion_depth,
config: PreprocConfig { cache: None, args },
},
&detection_reason,
)
.with_context(|| {
format!(
"adapting {} via {} failed",
filepath_hint.to_string_lossy(),
meta.name
)
})?;
let (uncompressed_size, compressed) = compbuf
.into_inner()
.map_err(|_| anyhow!("could not finish zstd"))? // can't use with_context here
.finish()?;
debug!(
"uncompressed output: {}",
print_bytes(uncompressed_size as f64)
);
if let Some(cached) = compressed {
debug!("compressed output: {}", print_bytes(cached.len() as f64));
Ok(Some(cached))
} else {
Ok(None)
}
}),
Box::new(|cached| {
let stdouti = std::io::stdout();
zstd::stream::copy_decode(cached, stdouti.lock())?;
Ok(())
}),
)?;
Ok(())
} else { */
// no cache arc - probably within archive
debug!("adapting without caching...");
let start = Instant::now();
let oread = adapter
.adapt(
AdaptInfo {
line_prefix,
filepath_hint: filepath_hint.clone(),
is_real_file,
inp: Box::new(inp),
archive_recursion_depth,
config: PreprocConfig { cache: None, args },
},
&detection_reason,
)
.with_context(|| {
format!(
"adapting {} via {} without caching failed",
filepath_hint.to_string_lossy(),
meta.name
)
})?;
debug!(
"running adapter {} took {}",
adapter.metadata().name,
print_dur(start)
);
Ok(oread)
/* }*/
}
None => { None => {
// allow passthrough if the file is in an archive or accurate matching is enabled // allow passthrough if the file is in an archive or accurate matching is enabled
// otherwise it should have been filtered out by rg pre-glob since rg can handle those better than us // otherwise it should have been filtered out by rg pre-glob since rg can handle those better than us
let allow_cat = !is_real_file || args.accurate; let allow_cat = !is_real_file || config.accurate;
if allow_cat { if allow_cat {
Ok(Box::new(inp)) Ok(Box::new(inp))
} else { } else {
@ -205,3 +87,141 @@ pub fn rga_preproc(ai: AdaptInfo) -> Result<ReadBox> {
} }
} }
} }
fn run_adapter(
ai: AdaptInfo,
adapter: Rc<dyn FileAdapter>,
detection_reason: SlowMatcher,
filtered_adapters: &Vec<Rc<dyn FileAdapter>>,
) -> Result<ReadBox> {
let AdaptInfo {
filepath_hint,
is_real_file,
inp,
line_prefix,
config,
archive_recursion_depth,
..
} = ai;
let meta = adapter.metadata();
debug!(
"Chose adapter '{}' because of matcher {:?}",
&meta.name, &detection_reason
);
eprintln!(
"{} adapter: {}",
filepath_hint.to_string_lossy(),
&meta.name
);
let db_name = format!("{}.v{}", meta.name, meta.version);
let cache_compression_level = config.cache.compression_level;
let cache_max_blob_len = config.cache.max_blob_len;
if let Some(mut cache) = LmdbCache::open(&config.cache)? {
let cache_key: Vec<u8> = {
let clean_path = filepath_hint.to_owned().clean();
let meta = std::fs::metadata(&filepath_hint)?;
let modified = meta.modified().expect("weird OS that can't into mtime");
if adapter.metadata().recurses {
let key = (
filtered_adapters
.iter()
.map(|a| (a.metadata().name.clone(), a.metadata().version))
.collect::<Vec<_>>(),
clean_path,
modified,
);
debug!("Cache key (with recursion): {:?}", key);
bincode::serialize(&key).expect("could not serialize path")
} else {
let key = (
adapter.metadata().name.clone(),
adapter.metadata().version,
clean_path,
modified,
);
debug!("Cache key (no recursion): {:?}", key);
bincode::serialize(&key).expect("could not serialize path")
}
};
// let dbg_ctx = format!("adapter {}", &adapter.metadata().name);
let cached = cache.get(&db_name, &cache_key)?;
match cached {
Some(cached) => Ok(Box::new(
zstd::stream::read::Decoder::new(std::io::Cursor::new(cached))
.context("could not create zstd decoder")?,
)),
None => {
debug!("cache MISS, running adapter");
debug!("adapting with caching...");
let inp = adapter
.adapt(
AdaptInfo {
line_prefix,
filepath_hint: filepath_hint.clone(),
is_real_file,
inp: Box::new(inp),
archive_recursion_depth,
config,
},
&detection_reason,
)
.with_context(|| {
format!(
"adapting {} via {} failed",
filepath_hint.to_string_lossy(),
meta.name
)
})?;
let inp = CachingReader::new(
inp,
cache_max_blob_len.0.try_into().unwrap(),
cache_compression_level.0.try_into().unwrap(),
Box::new(move |(uncompressed_size, compressed)| {
debug!(
"uncompressed output: {}",
print_bytes(uncompressed_size as f64)
);
if let Some(cached) = compressed {
debug!("compressed output: {}", print_bytes(cached.len() as f64));
cache.set(&db_name, &cache_key, &cached)?
}
Ok(())
}),
)?;
Ok(Box::new(inp))
}
}
} else {
// no cache arc - probably within archive
debug!("adapting without caching...");
let start = Instant::now();
let oread = adapter
.adapt(
AdaptInfo {
line_prefix,
filepath_hint: filepath_hint.clone(),
is_real_file,
inp: Box::new(inp),
archive_recursion_depth,
config,
},
&detection_reason,
)
.with_context(|| {
format!(
"adapting {} via {} without caching failed",
filepath_hint.to_string_lossy(),
meta.name
)
})?;
debug!(
"running adapter {} took {}",
adapter.metadata().name,
print_dur(start)
);
Ok(oread)
}
}

View File

@ -1,37 +1,37 @@
use crate::{print_bytes, print_dur, project_dirs}; use crate::{config::CacheConfig, print_bytes, print_dur, project_dirs};
use anyhow::{format_err, Context, Result}; use anyhow::{format_err, Context, Result};
use log::*; use log::*;
use std::{ use std::{
fmt::Display, fmt::Display,
path::Path,
sync::{Arc, RwLock}, sync::{Arc, RwLock},
time::Instant, time::Instant,
}; };
pub fn open() -> Result<Arc<RwLock<dyn PreprocCache>>> {
Ok(Arc::new(RwLock::new(LmdbCache::open()?)))
}
pub trait PreprocCache: Send + Sync { pub trait PreprocCache: Send + Sync {
// possible without second lambda? /*/// gets cache at specified key.
/// if cache hit, return the resulting data
/// else, run the given lambda, and store its result in the cache if present
fn get_or_run<'a>( fn get_or_run<'a>(
&mut self, &mut self,
db_name: &str, db_name: &str,
key: &[u8], key: &[u8],
adapter_name: &str, debug_name: &str,
runner: Box<dyn FnOnce() -> Result<Option<Vec<u8>>> + 'a>, runner: Box<dyn FnOnce() -> Result<Option<Vec<u8>>> + 'a>,
callback: Box<dyn FnOnce(&[u8]) -> Result<()> + 'a>, ) -> Result<Option<Vec<u8>>>;*/
) -> Result<()>;
fn get(&self, db_name: &str, key: &[u8]) -> Result<Option<Vec<u8>>>;
fn set(&mut self, db_name: &str, key: &[u8], value: &[u8]) -> Result<()>;
} }
/// opens a LMDB cache /// opens a LMDB cache
fn open_cache_db() -> Result<std::sync::Arc<std::sync::RwLock<rkv::Rkv>>> { fn open_cache_db(path: &Path) -> Result<std::sync::Arc<std::sync::RwLock<rkv::Rkv>>> {
let pd = project_dirs()?; std::fs::create_dir_all(path)?;
let app_cache = pd.cache_dir();
std::fs::create_dir_all(app_cache)?;
rkv::Manager::singleton() rkv::Manager::singleton()
.write() .write()
.map_err(|_| format_err!("could not write cache db manager"))? .map_err(|_| format_err!("could not write cache db manager"))?
.get_or_create(app_cache, |p| { .get_or_create(path, |p| {
let mut builder = rkv::Rkv::environment_builder(); let mut builder = rkv::Rkv::environment_builder();
builder builder
.set_flags(rkv::EnvironmentFlags::NO_SYNC | rkv::EnvironmentFlags::WRITE_MAP) // not durable cuz it's a cache .set_flags(rkv::EnvironmentFlags::NO_SYNC | rkv::EnvironmentFlags::WRITE_MAP) // not durable cuz it's a cache
@ -53,10 +53,14 @@ pub struct LmdbCache {
} }
impl LmdbCache { impl LmdbCache {
pub fn open() -> Result<LmdbCache> { pub fn open(config: &CacheConfig) -> Result<Option<LmdbCache>> {
Ok(LmdbCache { if config.disabled {
db_arc: open_cache_db()?, return Ok(None);
}) }
let path = Path::new(&config.path.0);
Ok(Some(LmdbCache {
db_arc: open_cache_db(&path)?,
}))
} }
} }
@ -70,27 +74,22 @@ impl Display for RkvErrWrap {
impl std::error::Error for RkvErrWrap {} impl std::error::Error for RkvErrWrap {}
impl PreprocCache for LmdbCache { impl PreprocCache for LmdbCache {
// possible without second lambda? fn get(&self, db_name: &str, key: &[u8]) -> Result<Option<Vec<u8>>> {
fn get_or_run<'a>(
&mut self,
db_name: &str,
key: &[u8],
adapter_name: &str,
runner: Box<dyn FnOnce() -> Result<Option<Vec<u8>>> + 'a>,
callback: Box<dyn FnOnce(&[u8]) -> Result<()> + 'a>,
) -> Result<()> {
let start = Instant::now(); let start = Instant::now();
let db_env = self.db_arc.read().unwrap(); let db_env = self
.db_arc
.read()
.map_err(|_| anyhow::anyhow!("Could not open lock, some lock writer panicked"))?;
let db = db_env let db = db_env
.open_single(db_name, rkv::store::Options::create()) .open_single(db_name, rkv::store::Options::create())
.map_err(RkvErrWrap) .map_err(RkvErrWrap)
.with_context(|| format_err!("could not open cache db store"))?; .context("could not open cache db store")?;
let reader = db_env.read().expect("could not get reader"); let reader = db_env.read().expect("could not get reader");
let cached = db let cached = db
.get(&reader, &key) .get(&reader, &key)
.map_err(RkvErrWrap) .map_err(RkvErrWrap)
.with_context(|| format_err!("could not read from db"))?; .context("could not read from db")?;
match cached { match cached {
Some(rkv::Value::Blob(cached)) => { Some(rkv::Value::Blob(cached)) => {
@ -99,34 +98,38 @@ impl PreprocCache for LmdbCache {
print_bytes(cached.len() as f64) print_bytes(cached.len() as f64)
); );
debug!("reading from cache took {}", print_dur(start)); debug!("reading from cache took {}", print_dur(start));
callback(cached)?; Ok(Some(Vec::from(cached)))
} }
Some(_) => Err(format_err!("Integrity: value not blob"))?, Some(_) => Err(format_err!("Integrity: value not blob"))?,
None => { None => Ok(None),
debug!("cache MISS, running adapter"); }
drop(reader); }
let runner_res = runner()?; fn set(&mut self, db_name: &str, key: &[u8], got: &[u8]) -> Result<()> {
debug!("running adapter {} took {}", adapter_name, print_dur(start)); let start = Instant::now();
let start = Instant::now(); debug!("writing {} to cache", print_bytes(got.len() as f64));
if let Some(got) = runner_res { let db_env = self
debug!("writing {} to cache", print_bytes(got.len() as f64)); .db_arc
let mut writer = db_env .read()
.write() .map_err(|_| anyhow::anyhow!("Could not open lock, some lock writer panicked"))?;
.map_err(RkvErrWrap)
.with_context(|| format_err!("could not open write handle to cache"))?; let db = db_env
db.put(&mut writer, &key, &rkv::Value::Blob(&got)) .open_single(db_name, rkv::store::Options::create())
.map_err(RkvErrWrap) .map_err(RkvErrWrap)
.with_context(|| format_err!("could not write to cache"))?; .context("could not open cache db store")?;
writer
.commit() let mut writer = db_env
.map_err(RkvErrWrap) .write()
.with_context(|| format!("could not write cache"))?; .map_err(RkvErrWrap)
debug!("writing to cache took {}", print_dur(start)); .with_context(|| format_err!("could not open write handle to cache"))?;
} else {
debug!("not caching output"); db.put(&mut writer, &key, &rkv::Value::Blob(&got))
} .map_err(RkvErrWrap)
} .with_context(|| format_err!("could not write to cache"))?;
}; writer
.commit()
.map_err(RkvErrWrap)
.with_context(|| format!("could not write cache"))?;
debug!("writing to cache took {}", print_dur(start));
Ok(()) Ok(())
} }
} }

View File

@ -1,12 +1,9 @@
use crate::{ use crate::{
adapters::{AdaptInfo, ReadBox}, adapters::{AdaptInfo, ReadBox},
args::RgaConfig, config::RgaConfig,
matching::{FastMatcher, SlowMatcher}, matching::{FastMatcher, SlowMatcher},
preproc::PreprocConfig,
};
use std::{
path::{Path, PathBuf},
}; };
use std::path::{Path, PathBuf};
pub fn test_data_dir() -> PathBuf { pub fn test_data_dir() -> PathBuf {
let mut d = PathBuf::from(env!("CARGO_MANIFEST_DIR")); let mut d = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
@ -22,10 +19,7 @@ pub fn simple_adapt_info(filepath: &Path, inp: ReadBox) -> (AdaptInfo, SlowMatch
archive_recursion_depth: 0, archive_recursion_depth: 0,
inp, inp,
line_prefix: "PREFIX:".to_string(), line_prefix: "PREFIX:".to_string(),
config: PreprocConfig { config: RgaConfig::default(),
cache: None,
args: RgaConfig::default(),
},
}, },
FastMatcher::FileExtension(filepath.extension().unwrap().to_string_lossy().into_owned()) FastMatcher::FileExtension(filepath.extension().unwrap().to_string_lossy().into_owned())
.into(), .into(),