diff --git a/src/adapters/spawning.rs b/src/adapters/spawning.rs index 186566f..70962bd 100644 --- a/src/adapters/spawning.rs +++ b/src/adapters/spawning.rs @@ -96,7 +96,7 @@ pub fn pipe_output<'a>( tokio::spawn(async move { let mut z = inp; - tokio::io::copy(&mut z, &mut stdi).await; + tokio::io::copy(&mut z, &mut stdi).await.unwrap(); }); Ok(Box::pin(stdo.chain(proc_wait(cmd)))) } @@ -177,7 +177,7 @@ mod test { let input = format!("{0}{0}{0}{0}", input); let (a, d) = simple_adapt_info( &Path::new("foo.txt"), - Box::pin(Cursor::new(input.as_bytes())), + Box::pin(Cursor::new(Vec::from(input))), ); let output = adapter.adapt(a, &d).unwrap(); diff --git a/src/preproc.rs b/src/preproc.rs index 11bf317..16a6c73 100644 --- a/src/preproc.rs +++ b/src/preproc.rs @@ -1,3 +1,4 @@ +use crate::adapted_iter::AdaptedFilesIterBox; use crate::adapters::*; use crate::caching_writer::async_read_and_write_to_cache; use crate::config::RgaConfig; @@ -8,10 +9,12 @@ use crate::{ print_bytes, }; use anyhow::*; +use async_compression::tokio::bufread::ZstdDecoder; use log::*; use path_clean::PathClean; // use postproc::PostprocPrefix; use std::convert::TryInto; +use std::io::Cursor; use std::path::Path; use tokio::io::AsyncBufRead; use tokio::io::AsyncBufReadExt; @@ -146,15 +149,6 @@ async fn run_adapter_recursively( detection_reason: FileMatcher, active_adapters: ActiveAdapters, ) -> Result { - let AdaptInfo { - filepath_hint, - is_real_file, - inp, - line_prefix, - config, - archive_recursion_depth, - postprocess, - } = ai; let meta = adapter.metadata(); debug!( "Chose adapter '{}' because of matcher {:?}", @@ -162,50 +156,30 @@ async fn run_adapter_recursively( ); eprintln!( "{} adapter: {}", - filepath_hint.to_string_lossy(), + ai.filepath_hint.to_string_lossy(), &meta.name ); let db_name = format!("{}.v{}", meta.name, meta.version); - let cache_compression_level = config.cache.compression_level; - let cache_max_blob_len = config.cache.max_blob_len; + let cache_compression_level = ai.config.cache.compression_level; + let cache_max_blob_len = ai.config.cache.max_blob_len; - let cache = if is_real_file { - LmdbCache::open(&config.cache)? + let cache = if ai.is_real_file { + LmdbCache::open(&ai.config.cache)? } else { None }; let mut cache = cache.context("No cache?")?; - let cache_key: Vec = compute_cache_key(&filepath_hint, adapter.as_ref(), active_adapters)?; + let cache_key: Vec = + compute_cache_key(&ai.filepath_hint, adapter.as_ref(), active_adapters)?; // let dbg_ctx = format!("adapter {}", &adapter.metadata().name); let cached = cache.get(&db_name, &cache_key)?; match cached { - Some(cached) => Ok(Box::pin( - async_compression::tokio::bufread::ZstdDecoder::new(std::io::Cursor::new(cached)), - )), + Some(cached) => Ok(Box::pin(ZstdDecoder::new(Cursor::new(cached)))), None => { debug!("cache MISS, running adapter"); debug!("adapting with caching..."); - let inp = adapter - .adapt( - AdaptInfo { - line_prefix, - filepath_hint: filepath_hint.clone(), - is_real_file, - inp, - archive_recursion_depth, - config, - postprocess, - }, - &detection_reason, - ) - .with_context(|| { - format!( - "adapting {} via {} failed", - filepath_hint.to_string_lossy(), - meta.name - ) - })?; + let inp = loop_adapt(adapter.as_ref(), detection_reason, ai)?; let inp = concat_read_streams(inp); let inp = async_read_and_write_to_cache( inp, @@ -228,3 +202,16 @@ async fn run_adapter_recursively( } } } + +fn loop_adapt( + adapter: &dyn FileAdapter, + detection_reason: FileMatcher, + ai: AdaptInfo, +) -> anyhow::Result { + let fph = ai.filepath_hint.clone(); + let inp = adapter + .adapt(ai, &detection_reason) + .with_context(|| format!("adapting {} via {} failed", fph.to_string_lossy(), adapter.metadata().name))?; + + Ok(inp) +} diff --git a/src/test_utils.rs b/src/test_utils.rs index 2a6b870..0efce30 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -33,7 +33,7 @@ pub fn simple_adapt_info<'a>(filepath: &Path, inp: ReadBox) -> (AdaptInfo, FileM ) } -pub async fn adapted_to_vec(adapted: AdaptedFilesIterBox<'_>) -> Result> { +pub async fn adapted_to_vec(adapted: AdaptedFilesIterBox) -> Result> { let mut res = concat_read_streams(adapted); let mut buf = Vec::new();