cleanup (WIP)

This commit is contained in:
phiresky 2022-10-30 02:34:56 +02:00
parent f401a96386
commit 002c62f57c
4 changed files with 28 additions and 76 deletions

View File

@ -1,24 +1,8 @@
use tokio_stream::Stream;
use crate::adapters::AdaptInfo; use crate::adapters::AdaptInfo;
// TODO: using iterator trait possible?? should basically be Iterator<AdaptInfo> pub trait AdaptedFilesIter: Stream<Item = AdaptInfo> + Send + Unpin {}
pub trait AdaptedFilesIter: Send { impl<T> AdaptedFilesIter for T where T: Stream<Item = AdaptInfo> + Send + Unpin {}
// next takes a 'a-lived reference and returns an AdaptInfo that lives as long as the reference
fn next<'a>(&'a mut self) -> Option<AdaptInfo>;
}
/// A single AdaptInfo
pub struct SingleAdaptedFileAsIter {
ai: Option<AdaptInfo>,
}
impl SingleAdaptedFileAsIter {
pub fn new<'a>(ai: AdaptInfo) -> SingleAdaptedFileAsIter {
SingleAdaptedFileAsIter { ai: Some(ai) }
}
}
impl AdaptedFilesIter for SingleAdaptedFileAsIter {
fn next<'a>(&'a mut self) -> Option<AdaptInfo> {
self.ai.take()
}
}
pub type AdaptedFilesIterBox = Box<dyn AdaptedFilesIter>; pub type AdaptedFilesIterBox = Box<dyn AdaptedFilesIter>;

View File

@ -1,4 +1,3 @@
use crate::adapted_iter::SingleAdaptedFileAsIter;
use super::*; use super::*;
use anyhow::Result; use anyhow::Result;
@ -124,7 +123,7 @@ impl FileAdapter for SpawningFileAdapter {
.with_context(|| format!("Could not set cmd arguments for {}", self.inner.get_exe()))?; .with_context(|| format!("Could not set cmd arguments for {}", self.inner.get_exe()))?;
debug!("executing {:?}", cmd); debug!("executing {:?}", cmd);
let output = pipe_output(&line_prefix, cmd, inp, self.inner.get_exe(), "")?; let output = pipe_output(&line_prefix, cmd, inp, self.inner.get_exe(), "")?;
Ok(Box::new(SingleAdaptedFileAsIter::new(AdaptInfo { Ok(Box::new(tokio_stream::once(AdaptInfo {
filepath_hint: PathBuf::from(format!("{}.txt", filepath_hint.to_string_lossy())), // TODO: customizable filepath_hint: PathBuf::from(format!("{}.txt", filepath_hint.to_string_lossy())), // TODO: customizable
inp: output, inp: output,
line_prefix, line_prefix,

View File

@ -1,4 +1,4 @@
use crate::adapted_iter::AdaptedFilesIterBox; use crate::adapted_iter::{AdaptedFilesIter, AdaptedFilesIterBox};
use crate::adapters::*; use crate::adapters::*;
use crate::caching_writer::async_read_and_write_to_cache; use crate::caching_writer::async_read_and_write_to_cache;
use crate::config::RgaConfig; use crate::config::RgaConfig;
@ -10,6 +10,7 @@ use crate::{
}; };
use anyhow::*; use anyhow::*;
use async_compression::tokio::bufread::ZstdDecoder; use async_compression::tokio::bufread::ZstdDecoder;
use async_stream::stream;
use log::*; use log::*;
use path_clean::PathClean; use path_clean::PathClean;
// use postproc::PostprocPrefix; // use postproc::PostprocPrefix;
@ -209,9 +210,21 @@ fn loop_adapt(
ai: AdaptInfo, ai: AdaptInfo,
) -> anyhow::Result<AdaptedFilesIterBox> { ) -> anyhow::Result<AdaptedFilesIterBox> {
let fph = ai.filepath_hint.clone(); let fph = ai.filepath_hint.clone();
let inp = adapter let inp = adapter.adapt(ai, &detection_reason).with_context(|| {
.adapt(ai, &detection_reason) format!(
.with_context(|| format!("adapting {} via {} failed", fph.to_string_lossy(), adapter.metadata().name))?; "adapting {} via {} failed",
fph.to_string_lossy(),
adapter.metadata().name
)
})?;
let s = stream! {
for await file in inp {
let (adapter, detection_reason) = choose_adapter(file.config, file.filepath_hint,file.archive_recursion_depth, file.inp);
for file in loop_adapt(adapter, detection_reason, file) {
yield file;
}
}
};
Ok(inp) Ok(inp)
} }

View File

@ -2,62 +2,18 @@ use tokio_util::io::{ReaderStream, StreamReader};
use crate::{adapted_iter::AdaptedFilesIterBox, adapters::*}; use crate::{adapted_iter::AdaptedFilesIterBox, adapters::*};
use async_stream::stream; use async_stream::stream;
use tokio_stream::StreamExt;
pub struct RecursingConcattyReader {
inp: AdaptedFilesIterBox,
cur: Option<ReadBox>,
}
pub fn concat_read_streams( pub fn concat_read_streams(
mut input: AdaptedFilesIterBox, input: AdaptedFilesIterBox,
) -> ReadBox { ) -> ReadBox {
let s = stream! { let s = stream! {
while let Some(output) = input.next() { for await output in input {
let mut stream = ReaderStream::new(output.inp); let stream = ReaderStream::new(output.inp);
while let Some(bytes) = stream.next().await { for await bytes in stream {
yield bytes; yield bytes;
} }
} }
}; };
Box::pin(StreamReader::new(s)) Box::pin(StreamReader::new(s))
} }
/*
impl<'a> RecursingConcattyReader<'a> {
pub fn concat(inp: AdaptedFilesIterBox<'a>) -> anyhow::Result<ReadBox<'a>> {
let mut r = RecursingConcattyReader { inp, cur: None };
r.ascend()?;
Ok(Box::new(r))
}
pub fn ascend(&mut self) -> anyhow::Result<()> {
let inp = &mut self.inp;
// get next inner file from inp
// we only need to access the inp: ReadIter when the inner reader is done, so this should be safe
let ai = unsafe {
// would love to make this safe, but how? something like OwnedRef<inp, cur>
(*(inp as *mut AdaptedFilesIterBox<'a>)).next()
};
self.cur = match ai {
Some(ai) => Some(rga_preproc(ai)?),
None => None,
};
Ok(())
}
}
impl<'a> AsyncRead for RecursingConcattyReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match &mut self.cur {
None => Ok(0), // last file ended
Some(cur) => match cur.read(buf) {
Err(e) => Err(e),
Ok(0) => {
// current file ended, go to next file
self.ascend()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
self.read(buf)
}
Ok(n) => Ok(n),
},
}
}
}
*/