This commit is contained in:
phiresky 2022-10-29 21:52:58 +02:00
parent cde0e209d2
commit 906043060b
7 changed files with 47 additions and 43 deletions

View File

@ -233,7 +233,7 @@ mod test {
let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?)); let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?));
let r = adapter.adapt(a, &d)?; let r = adapter.adapt(a, &d)?;
let o = adapted_to_vec(r)?; let o = adapted_to_vec(r).await?;
assert_eq!( assert_eq!(
String::from_utf8(o)?, String::from_utf8(o)?,
"PREFIX:hello world "PREFIX:hello world

View File

@ -10,7 +10,6 @@ use std::path::Path;
use std::process::{ExitStatus, Stdio}; use std::process::{ExitStatus, Stdio};
use std::task::Poll; use std::task::Poll;
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio::process::Child;
use tokio::process::Command; use tokio::process::Command;
// TODO: don't separate the trait and the struct // TODO: don't separate the trait and the struct
@ -57,9 +56,9 @@ struct ProcWaitReader {
} }
impl AsyncRead for ProcWaitReader { impl AsyncRead for ProcWaitReader {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>, _buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> { ) -> std::task::Poll<std::io::Result<()>> {
match self.proce.as_mut().poll(cx) { match self.proce.as_mut().poll(cx) {
std::task::Poll::Ready(x) => { std::task::Poll::Ready(x) => {
@ -101,9 +100,7 @@ pub fn pipe_output<'a>(
let mut stdi = cmd.stdin.take().expect("is piped"); let mut stdi = cmd.stdin.take().expect("is piped");
let stdo = cmd.stdout.take().expect("is piped"); let stdo = cmd.stdout.take().expect("is piped");
// TODO: deadlocks since this is run in the same thread as the thing reading from stdout of the process tokio::task::spawn_local(async move {
tokio::task::spawn_local(async {
tokio::pin!(inp);
tokio::io::copy(&mut inp, &mut stdi).await; tokio::io::copy(&mut inp, &mut stdi).await;
}); });
@ -120,12 +117,12 @@ impl FileAdapter for SpawningFileAdapter {
) -> Result<AdaptedFilesIterBox<'a>> { ) -> Result<AdaptedFilesIterBox<'a>> {
let AdaptInfo { let AdaptInfo {
filepath_hint, filepath_hint,
mut inp, inp,
line_prefix, line_prefix,
archive_recursion_depth, archive_recursion_depth,
postprocess, postprocess,
config, config,
is_real_file, ..
} = ai; } = ai;
let cmd = Command::new(self.inner.get_exe()); let cmd = Command::new(self.inner.get_exe());
@ -158,8 +155,8 @@ mod test {
test_utils::{adapted_to_vec, simple_adapt_info}, test_utils::{adapted_to_vec, simple_adapt_info},
}; };
#[test] #[tokio::test]
fn streaming() { async fn streaming() -> anyhow::Result<()> {
// an adapter that converts input line by line (deadlocks if the parent process tries to write everything and only then read it) // an adapter that converts input line by line (deadlocks if the parent process tries to write everything and only then read it)
let adapter = CustomAdapterConfig { let adapter = CustomAdapterConfig {
name: "simple text replacer".to_string(), name: "simple text replacer".to_string(),
@ -188,11 +185,12 @@ mod test {
let input = format!("{0}{0}{0}{0}", input); let input = format!("{0}{0}{0}{0}", input);
let (a, d) = simple_adapt_info( let (a, d) = simple_adapt_info(
&Path::new("foo.txt"), &Path::new("foo.txt"),
Box::new(Cursor::new(input.as_bytes())), Box::pin(Cursor::new(input.as_bytes())),
); );
let output = adapter.adapt(a, &d).unwrap(); let output = adapter.adapt(a, &d).unwrap();
let oup = adapted_to_vec(output).unwrap(); let oup = adapted_to_vec(output).await?;
println!("output: {}", String::from_utf8_lossy(&oup)); println!("output: {}", String::from_utf8_lossy(&oup));
Ok(())
} }
} }

View File

@ -1,9 +1,12 @@
use std::{pin::Pin, task::Poll}; use std::{pin::Pin, task::Poll};
use anyhow::Result; use anyhow::Result;
use log::*;
use tokio::{io::{AsyncRead, AsyncWriteExt, AsyncWrite}, pin};
use async_compression::tokio::write::ZstdEncoder; use async_compression::tokio::write::ZstdEncoder;
use log::*;
use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
pin,
};
/** /**
* wrap a writer so that it is passthrough, * wrap a writer so that it is passthrough,
@ -36,11 +39,14 @@ impl<R: AsyncRead> CachingReader<R> {
on_finish, on_finish,
}) })
} }
pub fn finish(&mut self, cx: &mut std::task::Context<'_>) -> std::io::Result<(u64, Option<Vec<u8>>)> { pub fn finish(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::io::Result<(u64, Option<Vec<u8>>)> {
if let Some(writer) = self.zstd_writer.take() { if let Some(writer) = self.zstd_writer.take() {
pin!(writer); pin!(writer);
writer.as_mut().poll_shutdown(cx)?; writer.as_mut().poll_shutdown(cx)?;
let res = writer.into_inner(); let res = writer.get_pin_mut().clone(); // TODO: without copying possible?
if res.len() <= self.max_cache_size { if res.len() <= self.max_cache_size {
return Ok((self.bytes_written, Some(res))); return Ok((self.bytes_written, Some(res)));
} }
@ -49,7 +55,7 @@ impl<R: AsyncRead> CachingReader<R> {
} }
async fn write_to_compressed(&mut self, buf: &[u8]) -> std::io::Result<()> { async fn write_to_compressed(&mut self, buf: &[u8]) -> std::io::Result<()> {
if let Some(writer) = self.zstd_writer.as_mut() { if let Some(writer) = self.zstd_writer.as_mut() {
let wrote = writer.write_all(buf).await?; writer.write_all(buf).await?;
let compressed_len = writer.get_ref().len(); let compressed_len = writer.get_ref().len();
trace!("wrote {} to zstd, len now {}", buf.len(), compressed_len); trace!("wrote {} to zstd, len now {}", buf.len(), compressed_len);
if compressed_len > self.max_cache_size { if compressed_len > self.max_cache_size {
@ -61,17 +67,19 @@ impl<R: AsyncRead> CachingReader<R> {
Ok(()) Ok(())
} }
} }
impl<R> AsyncRead for CachingReader<R> where R: AsyncRead { impl<R> AsyncRead for CachingReader<R>
where
R: AsyncRead,
{
fn poll_read( fn poll_read(
self: std::pin::Pin<&mut Self>, mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>, mut buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> { ) -> std::task::Poll<std::io::Result<()>> {
let old_filled = buf.filled(); let old_filled_len = buf.filled().len();
match self.inp.as_mut().poll_read(cx, &mut buf) { match self.inp.as_mut().poll_read(cx, &mut buf) {
/*Ok(0) => { /*Ok(0) => {
} }
Ok(read_bytes) => { Ok(read_bytes) => {
self.write_to_compressed(&buf[0..read_bytes])?; self.write_to_compressed(&buf[0..read_bytes])?;
@ -81,22 +89,23 @@ impl<R> AsyncRead for CachingReader<R> where R: AsyncRead {
Poll::Ready(rdy) => { Poll::Ready(rdy) => {
if let Ok(()) = &rdy { if let Ok(()) = &rdy {
let slice = buf.filled(); let slice = buf.filled();
let read_bytes = slice.len() - old_filled.len(); let read_bytes = slice.len() - old_filled_len;
if read_bytes == 0 { if read_bytes == 0 {
// EOF // EOF
// move out of box, replace with noop lambda // move out of box, replace with noop lambda
let on_finish = std::mem::replace(&mut self.on_finish, Box::new(|_| Ok(()))); let on_finish =
std::mem::replace(&mut self.on_finish, Box::new(|_| Ok(())));
// EOF, finish! // EOF, finish!
(on_finish)(self.finish(cx)?) (on_finish)(self.finish(cx)?)
.map(|()| 0) .map(|()| 0)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
} else { } else {
self.write_to_compressed(&slice[old_filled.len()..]); self.write_to_compressed(&slice[old_filled_len..]);
self.bytes_written += read_bytes as u64; self.bytes_written += read_bytes as u64;
} }
} }
Poll::Ready(rdy) Poll::Ready(rdy)
}, }
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
} }
} }

View File

@ -1,10 +1,10 @@
use crate::adapters::*; use crate::adapters::*;
use crate::config::RgaConfig; use crate::config::RgaConfig;
use crate::matching::*;
use crate::recurse::concat_read_streams; use crate::recurse::concat_read_streams;
use crate::{matching::*, recurse::RecursingConcattyReader};
use crate::{ use crate::{
preproc_cache::{LmdbCache, PreprocCache}, preproc_cache::{LmdbCache, PreprocCache},
print_bytes, print_dur, CachingReader, print_bytes, CachingReader,
}; };
use anyhow::*; use anyhow::*;
use log::*; use log::*;
@ -12,19 +12,19 @@ use path_clean::PathClean;
// use postproc::PostprocPrefix; // use postproc::PostprocPrefix;
use std::convert::TryInto; use std::convert::TryInto;
use std::path::Path; use std::path::Path;
use tokio::io::AsyncBufRead;
use tokio::io::AsyncBufReadExt; use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader; use tokio::io::BufReader;
use tokio::io::{AsyncBufRead, AsyncRead};
use std::{rc::Rc, time::Instant}; use std::rc::Rc;
type ActiveAdapters = Vec<(Rc<dyn FileAdapter>)>; type ActiveAdapters = Vec<Rc<dyn FileAdapter>>;
async fn choose_adapter( async fn choose_adapter(
config: &RgaConfig, config: &RgaConfig,
filepath_hint: &Path, filepath_hint: &Path,
archive_recursion_depth: i32, archive_recursion_depth: i32,
mut inp: &mut (impl AsyncBufRead + Unpin), inp: &mut (impl AsyncBufRead + Unpin),
) -> Result<Option<(Rc<dyn FileAdapter>, FileMatcher, ActiveAdapters)>> { ) -> Result<Option<(Rc<dyn FileAdapter>, FileMatcher, ActiveAdapters)>> {
let active_adapters = get_adapters_filtered(config.custom_adapters.clone(), &config.adapters)?; let active_adapters = get_adapters_filtered(config.custom_adapters.clone(), &config.adapters)?;
let adapters = adapter_matcher(&active_adapters, config.accurate)?; let adapters = adapter_matcher(&active_adapters, config.accurate)?;

View File

@ -25,7 +25,7 @@ fn open_cache_db(
path: &Path, path: &Path,
) -> Result<std::sync::Arc<std::sync::RwLock<rkv::Rkv<LmdbEnvironment>>>> { ) -> Result<std::sync::Arc<std::sync::RwLock<rkv::Rkv<LmdbEnvironment>>>> {
std::fs::create_dir_all(path)?; std::fs::create_dir_all(path)?;
use rkv::backend::LmdbEnvironmentFlags; // use rkv::backend::LmdbEnvironmentFlags;
rkv::Manager::<LmdbEnvironment>::singleton() rkv::Manager::<LmdbEnvironment>::singleton()
.write() .write()

View File

@ -1,11 +1,7 @@
use tokio::io::AsyncRead;
use tokio_util::io::{ReaderStream, StreamReader}; use tokio_util::io::{ReaderStream, StreamReader};
use crate::preproc::rga_preproc;
use crate::{adapted_iter::AdaptedFilesIterBox, adapters::*}; use crate::{adapted_iter::AdaptedFilesIterBox, adapters::*};
use async_stream::stream; use async_stream::stream;
use tokio_stream::Stream;
use bytes::Bytes;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
pub struct RecursingConcattyReader<'a> { pub struct RecursingConcattyReader<'a> {

View File

@ -3,10 +3,11 @@ use crate::{
adapters::{AdaptInfo, ReadBox}, adapters::{AdaptInfo, ReadBox},
config::RgaConfig, config::RgaConfig,
matching::{FastFileMatcher, FileMatcher}, matching::{FastFileMatcher, FileMatcher},
recurse::RecursingConcattyReader, recurse::concat_read_streams,
}; };
use anyhow::Result; use anyhow::Result;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use tokio::io::AsyncReadExt;
pub fn test_data_dir() -> PathBuf { pub fn test_data_dir() -> PathBuf {
let mut d = PathBuf::from(env!("CARGO_MANIFEST_DIR")); let mut d = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
@ -32,10 +33,10 @@ pub fn simple_adapt_info<'a>(filepath: &Path, inp: ReadBox<'a>) -> (AdaptInfo<'a
) )
} }
pub fn adapted_to_vec(adapted: AdaptedFilesIterBox<'_>) -> Result<Vec<u8>> { pub async fn adapted_to_vec(adapted: AdaptedFilesIterBox<'_>) -> Result<Vec<u8>> {
let mut res = RecursingConcattyReader::concat(adapted)?; let mut res = concat_read_streams(adapted);
let mut buf = Vec::new(); let mut buf = Vec::new();
res.read_to_end(&mut buf)?; res.read_to_end(&mut buf).await?;
Ok(buf) Ok(buf)
} }