From 906043060bf1d62f32e2e2a9642dc02961285f65 Mon Sep 17 00:00:00 2001 From: phiresky Date: Sat, 29 Oct 2022 21:52:58 +0200 Subject: [PATCH] lint --- src/adapters/custom.rs | 2 +- src/adapters/spawning.rs | 22 ++++++++++------------ src/caching_writer.rs | 39 ++++++++++++++++++++++++--------------- src/preproc.rs | 12 ++++++------ src/preproc_cache.rs | 2 +- src/recurse.rs | 4 ---- src/test_utils.rs | 9 +++++---- 7 files changed, 47 insertions(+), 43 deletions(-) diff --git a/src/adapters/custom.rs b/src/adapters/custom.rs index ad40570..43b49ea 100644 --- a/src/adapters/custom.rs +++ b/src/adapters/custom.rs @@ -233,7 +233,7 @@ mod test { let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?)); let r = adapter.adapt(a, &d)?; - let o = adapted_to_vec(r)?; + let o = adapted_to_vec(r).await?; assert_eq!( String::from_utf8(o)?, "PREFIX:hello world diff --git a/src/adapters/spawning.rs b/src/adapters/spawning.rs index c93c13d..8cf4161 100644 --- a/src/adapters/spawning.rs +++ b/src/adapters/spawning.rs @@ -10,7 +10,6 @@ use std::path::Path; use std::process::{ExitStatus, Stdio}; use std::task::Poll; use tokio::io::AsyncReadExt; -use tokio::process::Child; use tokio::process::Command; // TODO: don't separate the trait and the struct @@ -57,9 +56,9 @@ struct ProcWaitReader { } impl AsyncRead for ProcWaitReader { fn poll_read( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, + _buf: &mut tokio::io::ReadBuf<'_>, ) -> std::task::Poll> { match self.proce.as_mut().poll(cx) { std::task::Poll::Ready(x) => { @@ -101,9 +100,7 @@ pub fn pipe_output<'a>( let mut stdi = cmd.stdin.take().expect("is piped"); let stdo = cmd.stdout.take().expect("is piped"); - // TODO: deadlocks since this is run in the same thread as the thing reading from stdout of the process - tokio::task::spawn_local(async { - tokio::pin!(inp); + tokio::task::spawn_local(async move { tokio::io::copy(&mut inp, &mut stdi).await; }); @@ -120,12 +117,12 @@ impl FileAdapter for SpawningFileAdapter { ) -> Result> { let AdaptInfo { filepath_hint, - mut inp, + inp, line_prefix, archive_recursion_depth, postprocess, config, - is_real_file, + .. } = ai; let cmd = Command::new(self.inner.get_exe()); @@ -158,8 +155,8 @@ mod test { test_utils::{adapted_to_vec, simple_adapt_info}, }; - #[test] - fn streaming() { + #[tokio::test] + async fn streaming() -> anyhow::Result<()> { // an adapter that converts input line by line (deadlocks if the parent process tries to write everything and only then read it) let adapter = CustomAdapterConfig { name: "simple text replacer".to_string(), @@ -188,11 +185,12 @@ mod test { let input = format!("{0}{0}{0}{0}", input); let (a, d) = simple_adapt_info( &Path::new("foo.txt"), - Box::new(Cursor::new(input.as_bytes())), + Box::pin(Cursor::new(input.as_bytes())), ); let output = adapter.adapt(a, &d).unwrap(); - let oup = adapted_to_vec(output).unwrap(); + let oup = adapted_to_vec(output).await?; println!("output: {}", String::from_utf8_lossy(&oup)); + Ok(()) } } diff --git a/src/caching_writer.rs b/src/caching_writer.rs index 9adce24..f44ecdf 100644 --- a/src/caching_writer.rs +++ b/src/caching_writer.rs @@ -1,9 +1,12 @@ use std::{pin::Pin, task::Poll}; use anyhow::Result; -use log::*; -use tokio::{io::{AsyncRead, AsyncWriteExt, AsyncWrite}, pin}; use async_compression::tokio::write::ZstdEncoder; +use log::*; +use tokio::{ + io::{AsyncRead, AsyncWrite, AsyncWriteExt}, + pin, +}; /** * wrap a writer so that it is passthrough, @@ -36,11 +39,14 @@ impl CachingReader { on_finish, }) } - pub fn finish(&mut self, cx: &mut std::task::Context<'_>) -> std::io::Result<(u64, Option>)> { + pub fn finish( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::io::Result<(u64, Option>)> { if let Some(writer) = self.zstd_writer.take() { pin!(writer); writer.as_mut().poll_shutdown(cx)?; - let res = writer.into_inner(); + let res = writer.get_pin_mut().clone(); // TODO: without copying possible? if res.len() <= self.max_cache_size { return Ok((self.bytes_written, Some(res))); } @@ -49,7 +55,7 @@ impl CachingReader { } async fn write_to_compressed(&mut self, buf: &[u8]) -> std::io::Result<()> { if let Some(writer) = self.zstd_writer.as_mut() { - let wrote = writer.write_all(buf).await?; + writer.write_all(buf).await?; let compressed_len = writer.get_ref().len(); trace!("wrote {} to zstd, len now {}", buf.len(), compressed_len); if compressed_len > self.max_cache_size { @@ -61,17 +67,19 @@ impl CachingReader { Ok(()) } } -impl AsyncRead for CachingReader where R: AsyncRead { - +impl AsyncRead for CachingReader +where + R: AsyncRead, +{ fn poll_read( - self: std::pin::Pin<&mut Self>, + mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, + mut buf: &mut tokio::io::ReadBuf<'_>, ) -> std::task::Poll> { - let old_filled = buf.filled(); + let old_filled_len = buf.filled().len(); match self.inp.as_mut().poll_read(cx, &mut buf) { /*Ok(0) => { - + } Ok(read_bytes) => { self.write_to_compressed(&buf[0..read_bytes])?; @@ -81,22 +89,23 @@ impl AsyncRead for CachingReader where R: AsyncRead { Poll::Ready(rdy) => { if let Ok(()) = &rdy { let slice = buf.filled(); - let read_bytes = slice.len() - old_filled.len(); + let read_bytes = slice.len() - old_filled_len; if read_bytes == 0 { // EOF // move out of box, replace with noop lambda - let on_finish = std::mem::replace(&mut self.on_finish, Box::new(|_| Ok(()))); + let on_finish = + std::mem::replace(&mut self.on_finish, Box::new(|_| Ok(()))); // EOF, finish! (on_finish)(self.finish(cx)?) .map(|()| 0) .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; } else { - self.write_to_compressed(&slice[old_filled.len()..]); + self.write_to_compressed(&slice[old_filled_len..]); self.bytes_written += read_bytes as u64; } } Poll::Ready(rdy) - }, + } Poll::Pending => Poll::Pending, } } diff --git a/src/preproc.rs b/src/preproc.rs index d80db15..03619a7 100644 --- a/src/preproc.rs +++ b/src/preproc.rs @@ -1,10 +1,10 @@ use crate::adapters::*; use crate::config::RgaConfig; +use crate::matching::*; use crate::recurse::concat_read_streams; -use crate::{matching::*, recurse::RecursingConcattyReader}; use crate::{ preproc_cache::{LmdbCache, PreprocCache}, - print_bytes, print_dur, CachingReader, + print_bytes, CachingReader, }; use anyhow::*; use log::*; @@ -12,19 +12,19 @@ use path_clean::PathClean; // use postproc::PostprocPrefix; use std::convert::TryInto; use std::path::Path; +use tokio::io::AsyncBufRead; use tokio::io::AsyncBufReadExt; use tokio::io::BufReader; -use tokio::io::{AsyncBufRead, AsyncRead}; -use std::{rc::Rc, time::Instant}; +use std::rc::Rc; -type ActiveAdapters = Vec<(Rc)>; +type ActiveAdapters = Vec>; async fn choose_adapter( config: &RgaConfig, filepath_hint: &Path, archive_recursion_depth: i32, - mut inp: &mut (impl AsyncBufRead + Unpin), + inp: &mut (impl AsyncBufRead + Unpin), ) -> Result, FileMatcher, ActiveAdapters)>> { let active_adapters = get_adapters_filtered(config.custom_adapters.clone(), &config.adapters)?; let adapters = adapter_matcher(&active_adapters, config.accurate)?; diff --git a/src/preproc_cache.rs b/src/preproc_cache.rs index c6b4037..d61aa08 100644 --- a/src/preproc_cache.rs +++ b/src/preproc_cache.rs @@ -25,7 +25,7 @@ fn open_cache_db( path: &Path, ) -> Result>>> { std::fs::create_dir_all(path)?; - use rkv::backend::LmdbEnvironmentFlags; + // use rkv::backend::LmdbEnvironmentFlags; rkv::Manager::::singleton() .write() diff --git a/src/recurse.rs b/src/recurse.rs index 517c3e5..ba8b055 100644 --- a/src/recurse.rs +++ b/src/recurse.rs @@ -1,11 +1,7 @@ -use tokio::io::AsyncRead; use tokio_util::io::{ReaderStream, StreamReader}; -use crate::preproc::rga_preproc; use crate::{adapted_iter::AdaptedFilesIterBox, adapters::*}; use async_stream::stream; -use tokio_stream::Stream; -use bytes::Bytes; use tokio_stream::StreamExt; pub struct RecursingConcattyReader<'a> { diff --git a/src/test_utils.rs b/src/test_utils.rs index 3ab4cc2..6d9efc2 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -3,10 +3,11 @@ use crate::{ adapters::{AdaptInfo, ReadBox}, config::RgaConfig, matching::{FastFileMatcher, FileMatcher}, - recurse::RecursingConcattyReader, + recurse::concat_read_streams, }; use anyhow::Result; use std::path::{Path, PathBuf}; +use tokio::io::AsyncReadExt; pub fn test_data_dir() -> PathBuf { let mut d = PathBuf::from(env!("CARGO_MANIFEST_DIR")); @@ -32,10 +33,10 @@ pub fn simple_adapt_info<'a>(filepath: &Path, inp: ReadBox<'a>) -> (AdaptInfo<'a ) } -pub fn adapted_to_vec(adapted: AdaptedFilesIterBox<'_>) -> Result> { - let mut res = RecursingConcattyReader::concat(adapted)?; +pub async fn adapted_to_vec(adapted: AdaptedFilesIterBox<'_>) -> Result> { + let mut res = concat_read_streams(adapted); let mut buf = Vec::new(); - res.read_to_end(&mut buf)?; + res.read_to_end(&mut buf).await?; Ok(buf) }