fix pinning and sending

This commit is contained in:
phiresky 2022-11-05 00:47:43 +01:00
parent 002c62f57c
commit 42fe225373
6 changed files with 80 additions and 65 deletions

View File

@ -1,8 +1,10 @@
use std::pin::Pin;
use tokio_stream::Stream; use tokio_stream::Stream;
use crate::adapters::AdaptInfo; use crate::adapters::AdaptInfo;
pub trait AdaptedFilesIter: Stream<Item = AdaptInfo> + Send + Unpin {} pub trait AdaptedFilesIter: Stream<Item = AdaptInfo> + Send {}
impl<T> AdaptedFilesIter for T where T: Stream<Item = AdaptInfo> + Send + Unpin {} impl<T> AdaptedFilesIter for T where T: Stream<Item = AdaptInfo> + Send {}
pub type AdaptedFilesIterBox = Box<dyn AdaptedFilesIter>; pub type AdaptedFilesIterBox = Pin<Box<dyn AdaptedFilesIter>>;

View File

@ -4,6 +4,7 @@ pub mod custom;
// pub mod postproc; // pub mod postproc;
// pub mod pdfpages; // pub mod pdfpages;
pub mod spawning; pub mod spawning;
use std::sync::Arc;
// pub mod sqlite; // pub mod sqlite;
// pub mod tar; // pub mod tar;
// pub mod tesseract; // pub mod tesseract;
@ -22,6 +23,7 @@ use std::iter::Iterator;
use std::path::PathBuf; use std::path::PathBuf;
use std::pin::Pin; use std::pin::Pin;
use std::rc::Rc; use std::rc::Rc;
use core::fmt::Debug;
pub type ReadBox = Pin<Box<dyn AsyncRead + Send>>; pub type ReadBox = Pin<Box<dyn AsyncRead + Send>>;
pub struct AdapterMeta { pub struct AdapterMeta {
@ -76,7 +78,7 @@ impl AdapterMeta {
pub trait GetMetadata { pub trait GetMetadata {
fn metadata(&self) -> &AdapterMeta; fn metadata(&self) -> &AdapterMeta;
} }
pub trait FileAdapter: GetMetadata { pub trait FileAdapter: GetMetadata + Send + Sync{
/// adapt a file. /// adapt a file.
/// ///
/// detection_reason is the Matcher that was used to identify this file. Unless --rga-accurate was given, it is always a FastMatcher /// detection_reason is the Matcher that was used to identify this file. Unless --rga-accurate was given, it is always a FastMatcher
@ -99,22 +101,22 @@ pub struct AdaptInfo {
/// prefix every output line with this string to better indicate the file's location if it is in some archive /// prefix every output line with this string to better indicate the file's location if it is in some archive
pub line_prefix: String, pub line_prefix: String,
pub postprocess: bool, pub postprocess: bool,
pub config: RgaConfig pub config: RgaConfig,
} }
/// (enabledAdapters, disabledAdapters) /// (enabledAdapters, disabledAdapters)
type AdaptersTuple = (Vec<Rc<dyn FileAdapter>>, Vec<Rc<dyn FileAdapter>>); type AdaptersTuple = (Vec<Arc<dyn FileAdapter>>, Vec<Arc<dyn FileAdapter>>);
pub fn get_all_adapters(custom_adapters: Option<Vec<CustomAdapterConfig>>) -> AdaptersTuple { pub fn get_all_adapters(custom_adapters: Option<Vec<CustomAdapterConfig>>) -> AdaptersTuple {
// order in descending priority // order in descending priority
let mut adapters: Vec<Rc<dyn FileAdapter>> = vec![]; let mut adapters: Vec<Arc<dyn FileAdapter>> = vec![];
if let Some(custom_adapters) = custom_adapters { if let Some(custom_adapters) = custom_adapters {
for adapter_config in custom_adapters { for adapter_config in custom_adapters {
adapters.push(Rc::new(adapter_config.to_adapter())); adapters.push(Arc::new(adapter_config.to_adapter()));
} }
} }
let internal_adapters: Vec<Rc<dyn FileAdapter>> = vec![ let internal_adapters: Vec<Arc<dyn FileAdapter>> = vec![
//Rc::new(ffmpeg::FFmpegAdapter::new()), //Rc::new(ffmpeg::FFmpegAdapter::new()),
// Rc::new(zip::ZipAdapter::new()), // Rc::new(zip::ZipAdapter::new()),
//Rc::new(decompress::DecompressAdapter::new()), //Rc::new(decompress::DecompressAdapter::new()),
@ -126,7 +128,7 @@ pub fn get_all_adapters(custom_adapters: Option<Vec<CustomAdapterConfig>>) -> Ad
adapters.extend( adapters.extend(
builtin_spawning_adapters builtin_spawning_adapters
.iter() .iter()
.map(|e| -> Rc<dyn FileAdapter> { Rc::new(e.to_adapter()) }), .map(|e| -> Arc<dyn FileAdapter> { Arc::new(e.to_adapter()) }),
); );
adapters.extend(internal_adapters); adapters.extend(internal_adapters);
@ -146,7 +148,7 @@ pub fn get_all_adapters(custom_adapters: Option<Vec<CustomAdapterConfig>>) -> Ad
pub fn get_adapters_filtered<T: AsRef<str>>( pub fn get_adapters_filtered<T: AsRef<str>>(
custom_adapters: Option<Vec<CustomAdapterConfig>>, custom_adapters: Option<Vec<CustomAdapterConfig>>,
adapter_names: &Vec<T>, adapter_names: &Vec<T>,
) -> Result<Vec<Rc<dyn FileAdapter>>> { ) -> Result<Vec<Arc<dyn FileAdapter>>> {
let (def_enabled_adapters, def_disabled_adapters) = get_all_adapters(custom_adapters); let (def_enabled_adapters, def_disabled_adapters) = get_all_adapters(custom_adapters);
let adapters = if !adapter_names.is_empty() { let adapters = if !adapter_names.is_empty() {
let adapters_map: HashMap<_, _> = def_enabled_adapters let adapters_map: HashMap<_, _> = def_enabled_adapters

View File

@ -14,7 +14,7 @@ use tokio::io::AsyncReadExt;
use tokio::process::{Child, Command}; use tokio::process::{Child, Command};
// TODO: don't separate the trait and the struct // TODO: don't separate the trait and the struct
pub trait SpawningFileAdapterTrait: GetMetadata { pub trait SpawningFileAdapterTrait: GetMetadata + Send + Sync {
fn get_exe(&self) -> &str; fn get_exe(&self) -> &str;
fn command(&self, filepath_hint: &Path, command: Command) -> Result<Command>; fn command(&self, filepath_hint: &Path, command: Command) -> Result<Command>;
} }
@ -123,7 +123,7 @@ impl FileAdapter for SpawningFileAdapter {
.with_context(|| format!("Could not set cmd arguments for {}", self.inner.get_exe()))?; .with_context(|| format!("Could not set cmd arguments for {}", self.inner.get_exe()))?;
debug!("executing {:?}", cmd); debug!("executing {:?}", cmd);
let output = pipe_output(&line_prefix, cmd, inp, self.inner.get_exe(), "")?; let output = pipe_output(&line_prefix, cmd, inp, self.inner.get_exe(), "")?;
Ok(Box::new(tokio_stream::once(AdaptInfo { Ok(Box::pin(tokio_stream::once(AdaptInfo {
filepath_hint: PathBuf::from(format!("{}.txt", filepath_hint.to_string_lossy())), // TODO: customizable filepath_hint: PathBuf::from(format!("{}.txt", filepath_hint.to_string_lossy())), // TODO: customizable
inp: output, inp: output,
line_prefix, line_prefix,

View File

@ -15,7 +15,7 @@ fn list_adapters(args: RgaConfig) -> Result<()> {
let (enabled_adapters, disabled_adapters) = get_all_adapters(args.custom_adapters.clone()); let (enabled_adapters, disabled_adapters) = get_all_adapters(args.custom_adapters.clone());
println!("Adapters:\n"); println!("Adapters:\n");
let print = |adapter: std::rc::Rc<dyn FileAdapter>| { let print = |adapter: std::rc::Arc<dyn FileAdapter>| {
let meta = adapter.metadata(); let meta = adapter.metadata();
let matchers = meta let matchers = meta
.fast_matchers .fast_matchers

View File

@ -9,7 +9,7 @@ use regex::{Regex, RegexSet};
use std::iter::Iterator; use std::iter::Iterator;
use std::rc::Rc; use std::sync::Arc;
// match only based on file path // match only based on file path
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -54,9 +54,9 @@ pub fn extension_to_regex(extension: &str) -> Regex {
} }
pub fn adapter_matcher( pub fn adapter_matcher(
adapters: &Vec<Rc<dyn FileAdapter>>, adapters: &Vec<Arc<dyn FileAdapter>>,
slow: bool, slow: bool,
) -> Result<impl Fn(FileMeta) -> Option<(Rc<dyn FileAdapter>, FileMatcher)>> { ) -> Result<impl Fn(FileMeta) -> Option<(Arc<dyn FileAdapter>, FileMatcher)>> {
// need order later // need order later
let adapter_names: Vec<String> = adapters.iter().map(|e| e.metadata().name.clone()).collect(); let adapter_names: Vec<String> = adapters.iter().map(|e| e.metadata().name.clone()).collect();
let mut fname_regexes = vec![]; let mut fname_regexes = vec![];

View File

@ -13,6 +13,7 @@ use async_compression::tokio::bufread::ZstdDecoder;
use async_stream::stream; use async_stream::stream;
use log::*; use log::*;
use path_clean::PathClean; use path_clean::PathClean;
use std::sync::Arc;
// use postproc::PostprocPrefix; // use postproc::PostprocPrefix;
use std::convert::TryInto; use std::convert::TryInto;
use std::io::Cursor; use std::io::Cursor;
@ -21,16 +22,14 @@ use tokio::io::AsyncBufRead;
use tokio::io::AsyncBufReadExt; use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader; use tokio::io::BufReader;
use std::rc::Rc; type ActiveAdapters = Vec<Arc<dyn FileAdapter>>;
type ActiveAdapters = Vec<Rc<dyn FileAdapter>>;
async fn choose_adapter( async fn choose_adapter(
config: &RgaConfig, config: &RgaConfig,
filepath_hint: &Path, filepath_hint: &Path,
archive_recursion_depth: i32, archive_recursion_depth: i32,
inp: &mut (impl AsyncBufRead + Unpin), inp: &mut (impl AsyncBufRead + Unpin),
) -> Result<Option<(Rc<dyn FileAdapter>, FileMatcher, ActiveAdapters)>> { ) -> Result<Option<(Arc<dyn FileAdapter>, FileMatcher, ActiveAdapters)>> {
let active_adapters = get_adapters_filtered(config.custom_adapters.clone(), &config.adapters)?; let active_adapters = get_adapters_filtered(config.custom_adapters.clone(), &config.adapters)?;
let adapters = adapter_matcher(&active_adapters, config.accurate)?; let adapters = adapter_matcher(&active_adapters, config.accurate)?;
let filename = filepath_hint let filename = filepath_hint
@ -52,6 +51,46 @@ async fn choose_adapter(
}); });
Ok(adapter.map(|e| (e.0, e.1, active_adapters))) Ok(adapter.map(|e| (e.0, e.1, active_adapters)))
} }
async fn buf_choose_adapter(ai: AdaptInfo) -> Result<(AdaptInfo, Option<(Arc<dyn FileAdapter>, FileMatcher, ActiveAdapters)>)> {
let mut inp = BufReader::with_capacity(1 << 16, ai.inp);
let adapter = choose_adapter(
&ai.config,
&ai.filepath_hint,
ai.archive_recursion_depth,
&mut inp,
)
.await?;
let ai = AdaptInfo {
inp: Box::pin(inp),
..ai
};
Ok((ai, adapter))
}
fn handle_no_adapter(ai: AdaptInfo) -> Result<AdaptInfo> {
// allow passthrough if the file is in an archive or accurate matching is enabled
// otherwise it should have been filtered out by rg pre-glob since rg can handle those better than us
let allow_cat = !ai.is_real_file || ai.config.accurate;
if allow_cat {
if ai.postprocess {
panic!("not implemented");
/* (
Rc::new(PostprocPrefix {}) as Arc<dyn FileAdapter>,
FileMatcher::Fast(FastFileMatcher::FileExtension("default".to_string())), // todo: separate enum value for this
)*/
} else {
return Ok(ai);
}
} else {
return Err(format_err!(
"No adapter found for file {:?}, passthrough disabled.",
ai.filepath_hint
.file_name()
.ok_or_else(|| format_err!("Empty filename"))?
));
}
}
/** /**
* preprocess a file as defined in `ai`. * preprocess a file as defined in `ai`.
* *
@ -67,46 +106,13 @@ pub async fn rga_preproc(ai: AdaptInfo) -> Result<ReadBox> {
// todo: figure out when using a bufreader is a good idea and when it is not // todo: figure out when using a bufreader is a good idea and when it is not
// seems to be good for File::open() reads, but not sure about within archives (tar, zip) // seems to be good for File::open() reads, but not sure about within archives (tar, zip)
let mut inp = BufReader::with_capacity(1 << 16, ai.inp); let (ai, adapter) = buf_choose_adapter(ai).await?;
let adapter = choose_adapter( let Some((adapter, detection_reason, active_adapters)) = adapter else {
&ai.config, return handle_no_adapter(ai).map(|ai| ai.inp);
&ai.filepath_hint,
ai.archive_recursion_depth,
&mut inp,
)
.await?;
let (adapter, detection_reason, active_adapters) = match adapter {
Some((a, d, e)) => (a, d, e),
None => {
// allow passthrough if the file is in an archive or accurate matching is enabled
// otherwise it should have been filtered out by rg pre-glob since rg can handle those better than us
let allow_cat = !ai.is_real_file || ai.config.accurate;
if allow_cat {
if ai.postprocess {
panic!("not implemented");
/* (
Rc::new(PostprocPrefix {}) as Rc<dyn FileAdapter>,
FileMatcher::Fast(FastFileMatcher::FileExtension("default".to_string())), // todo: separate enum value for this
)*/
} else {
return Ok(Box::pin(inp));
}
} else {
return Err(format_err!(
"No adapter found for file {:?}, passthrough disabled.",
ai.filepath_hint
.file_name()
.ok_or_else(|| format_err!("Empty filename"))?
));
}
}
}; };
let path_hint_copy = ai.filepath_hint.clone(); let path_hint_copy = ai.filepath_hint.clone();
run_adapter_recursively( adapt_caching(
AdaptInfo { ai,
inp: Box::pin(inp),
..ai
},
adapter, adapter,
detection_reason, detection_reason,
active_adapters, active_adapters,
@ -144,9 +150,10 @@ fn compute_cache_key(
bincode::serialize(&key).context("could not serialize path") bincode::serialize(&key).context("could not serialize path")
} }
} }
async fn run_adapter_recursively(
async fn adapt_caching(
ai: AdaptInfo, ai: AdaptInfo,
adapter: Rc<dyn FileAdapter>, adapter: Arc<dyn FileAdapter>,
detection_reason: FileMatcher, detection_reason: FileMatcher,
active_adapters: ActiveAdapters, active_adapters: ActiveAdapters,
) -> Result<ReadBox> { ) -> Result<ReadBox> {
@ -220,11 +227,15 @@ fn loop_adapt(
let s = stream! { let s = stream! {
for await file in inp { for await file in inp {
let (adapter, detection_reason) = choose_adapter(file.config, file.filepath_hint,file.archive_recursion_depth, file.inp); let (file, chosen_adapter) = buf_choose_adapter(file).await.expect("todo: handle");
for file in loop_adapt(adapter, detection_reason, file) { if let Some((adapter, detection_reason, active_adapters)) = chosen_adapter {
for await file in loop_adapt(adapter.as_ref(), detection_reason, file).expect("todo: handle") {
yield file; yield file;
} }
} else {
yield handle_no_adapter(file).expect("todo: handle");
}
} }
}; };
Ok(inp) Ok(Box::pin(s))
} }