prepare for recursion

This commit is contained in:
phiresky 2022-10-30 00:24:46 +02:00
parent af8cf228b3
commit f401a96386
3 changed files with 28 additions and 41 deletions

View File

@ -96,7 +96,7 @@ pub fn pipe_output<'a>(
tokio::spawn(async move { tokio::spawn(async move {
let mut z = inp; let mut z = inp;
tokio::io::copy(&mut z, &mut stdi).await; tokio::io::copy(&mut z, &mut stdi).await.unwrap();
}); });
Ok(Box::pin(stdo.chain(proc_wait(cmd)))) Ok(Box::pin(stdo.chain(proc_wait(cmd))))
} }
@ -177,7 +177,7 @@ mod test {
let input = format!("{0}{0}{0}{0}", input); let input = format!("{0}{0}{0}{0}", input);
let (a, d) = simple_adapt_info( let (a, d) = simple_adapt_info(
&Path::new("foo.txt"), &Path::new("foo.txt"),
Box::pin(Cursor::new(input.as_bytes())), Box::pin(Cursor::new(Vec::from(input))),
); );
let output = adapter.adapt(a, &d).unwrap(); let output = adapter.adapt(a, &d).unwrap();

View File

@ -1,3 +1,4 @@
use crate::adapted_iter::AdaptedFilesIterBox;
use crate::adapters::*; use crate::adapters::*;
use crate::caching_writer::async_read_and_write_to_cache; use crate::caching_writer::async_read_and_write_to_cache;
use crate::config::RgaConfig; use crate::config::RgaConfig;
@ -8,10 +9,12 @@ use crate::{
print_bytes, print_bytes,
}; };
use anyhow::*; use anyhow::*;
use async_compression::tokio::bufread::ZstdDecoder;
use log::*; use log::*;
use path_clean::PathClean; use path_clean::PathClean;
// use postproc::PostprocPrefix; // use postproc::PostprocPrefix;
use std::convert::TryInto; use std::convert::TryInto;
use std::io::Cursor;
use std::path::Path; use std::path::Path;
use tokio::io::AsyncBufRead; use tokio::io::AsyncBufRead;
use tokio::io::AsyncBufReadExt; use tokio::io::AsyncBufReadExt;
@ -146,15 +149,6 @@ async fn run_adapter_recursively(
detection_reason: FileMatcher, detection_reason: FileMatcher,
active_adapters: ActiveAdapters, active_adapters: ActiveAdapters,
) -> Result<ReadBox> { ) -> Result<ReadBox> {
let AdaptInfo {
filepath_hint,
is_real_file,
inp,
line_prefix,
config,
archive_recursion_depth,
postprocess,
} = ai;
let meta = adapter.metadata(); let meta = adapter.metadata();
debug!( debug!(
"Chose adapter '{}' because of matcher {:?}", "Chose adapter '{}' because of matcher {:?}",
@ -162,50 +156,30 @@ async fn run_adapter_recursively(
); );
eprintln!( eprintln!(
"{} adapter: {}", "{} adapter: {}",
filepath_hint.to_string_lossy(), ai.filepath_hint.to_string_lossy(),
&meta.name &meta.name
); );
let db_name = format!("{}.v{}", meta.name, meta.version); let db_name = format!("{}.v{}", meta.name, meta.version);
let cache_compression_level = config.cache.compression_level; let cache_compression_level = ai.config.cache.compression_level;
let cache_max_blob_len = config.cache.max_blob_len; let cache_max_blob_len = ai.config.cache.max_blob_len;
let cache = if is_real_file { let cache = if ai.is_real_file {
LmdbCache::open(&config.cache)? LmdbCache::open(&ai.config.cache)?
} else { } else {
None None
}; };
let mut cache = cache.context("No cache?")?; let mut cache = cache.context("No cache?")?;
let cache_key: Vec<u8> = compute_cache_key(&filepath_hint, adapter.as_ref(), active_adapters)?; let cache_key: Vec<u8> =
compute_cache_key(&ai.filepath_hint, adapter.as_ref(), active_adapters)?;
// let dbg_ctx = format!("adapter {}", &adapter.metadata().name); // let dbg_ctx = format!("adapter {}", &adapter.metadata().name);
let cached = cache.get(&db_name, &cache_key)?; let cached = cache.get(&db_name, &cache_key)?;
match cached { match cached {
Some(cached) => Ok(Box::pin( Some(cached) => Ok(Box::pin(ZstdDecoder::new(Cursor::new(cached)))),
async_compression::tokio::bufread::ZstdDecoder::new(std::io::Cursor::new(cached)),
)),
None => { None => {
debug!("cache MISS, running adapter"); debug!("cache MISS, running adapter");
debug!("adapting with caching..."); debug!("adapting with caching...");
let inp = adapter let inp = loop_adapt(adapter.as_ref(), detection_reason, ai)?;
.adapt(
AdaptInfo {
line_prefix,
filepath_hint: filepath_hint.clone(),
is_real_file,
inp,
archive_recursion_depth,
config,
postprocess,
},
&detection_reason,
)
.with_context(|| {
format!(
"adapting {} via {} failed",
filepath_hint.to_string_lossy(),
meta.name
)
})?;
let inp = concat_read_streams(inp); let inp = concat_read_streams(inp);
let inp = async_read_and_write_to_cache( let inp = async_read_and_write_to_cache(
inp, inp,
@ -228,3 +202,16 @@ async fn run_adapter_recursively(
} }
} }
} }
fn loop_adapt(
adapter: &dyn FileAdapter,
detection_reason: FileMatcher,
ai: AdaptInfo,
) -> anyhow::Result<AdaptedFilesIterBox> {
let fph = ai.filepath_hint.clone();
let inp = adapter
.adapt(ai, &detection_reason)
.with_context(|| format!("adapting {} via {} failed", fph.to_string_lossy(), adapter.metadata().name))?;
Ok(inp)
}

View File

@ -33,7 +33,7 @@ pub fn simple_adapt_info<'a>(filepath: &Path, inp: ReadBox) -> (AdaptInfo, FileM
) )
} }
pub async fn adapted_to_vec(adapted: AdaptedFilesIterBox<'_>) -> Result<Vec<u8>> { pub async fn adapted_to_vec(adapted: AdaptedFilesIterBox) -> Result<Vec<u8>> {
let mut res = concat_read_streams(adapted); let mut res = concat_read_streams(adapted);
let mut buf = Vec::new(); let mut buf = Vec::new();