From b8cca5477011afa827aa23adda465975fa59ef1e Mon Sep 17 00:00:00 2001 From: phiresky Date: Tue, 21 Feb 2023 11:43:17 +0100 Subject: [PATCH] binary detection and utf16 --- src/adapters.rs | 9 ++++- src/adapters/custom.rs | 7 ++-- src/adapters/decompress.rs | 11 ++++-- src/adapters/postproc.rs | 77 ++++++++++++++++++++++++-------------- src/adapters/sqlite.rs | 2 +- src/adapters/tar.rs | 10 ++++- src/adapters/writing.rs | 3 +- src/adapters/zip.rs | 14 ++++--- src/preproc.rs | 31 ++++++++++----- 9 files changed, 111 insertions(+), 53 deletions(-) diff --git a/src/adapters.rs b/src/adapters.rs index 7f97050..babaf6f 100644 --- a/src/adapters.rs +++ b/src/adapters.rs @@ -9,6 +9,7 @@ pub mod writing; pub mod zip; use crate::{adapted_iter::AdaptedFilesIterBox, config::RgaConfig, matching::*}; use anyhow::{format_err, Context, Result}; +use async_trait::async_trait; use custom::CustomAdapterConfig; use custom::BUILTIN_SPAWNING_ADAPTERS; use log::*; @@ -76,11 +77,17 @@ impl AdapterMeta { pub trait GetMetadata { fn metadata(&self) -> &AdapterMeta; } + +#[async_trait] pub trait FileAdapter: GetMetadata + Send + Sync { /// adapt a file. /// /// detection_reason is the Matcher that was used to identify this file. Unless --rga-accurate was given, it is always a FastMatcher - fn adapt(&self, a: AdaptInfo, detection_reason: &FileMatcher) -> Result; + async fn adapt( + &self, + a: AdaptInfo, + detection_reason: &FileMatcher, + ) -> Result; } pub struct AdaptInfo { diff --git a/src/adapters/custom.rs b/src/adapters/custom.rs index 89d60b5..d1c06c1 100644 --- a/src/adapters/custom.rs +++ b/src/adapters/custom.rs @@ -224,8 +224,9 @@ impl CustomSpawningFileAdapter { Ok(command) } } +#[async_trait] impl FileAdapter for CustomSpawningFileAdapter { - fn adapt<'a>( + async fn adapt( &self, ai: AdaptInfo, _detection_reason: &FileMatcher, @@ -314,7 +315,7 @@ mod test { let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?)); // let r = adapter.adapt(a, &d)?; - let r = loop_adapt(&adapter, d, a)?; + let r = loop_adapt(&adapter, d, a).await?; let o = adapted_to_vec(r).await?; assert_eq!( String::from_utf8(o)?, @@ -368,7 +369,7 @@ PREFIX:Page 1: Path::new("foo.txt"), Box::pin(Cursor::new(Vec::from(input))), ); - let output = adapter.adapt(a, &d).unwrap(); + let output = adapter.adapt(a, &d).await.unwrap(); let oup = adapted_to_vec(output).await?; println!("output: {}", String::from_utf8_lossy(&oup)); diff --git a/src/adapters/decompress.rs b/src/adapters/decompress.rs index fe5b6c1..da28913 100644 --- a/src/adapters/decompress.rs +++ b/src/adapters/decompress.rs @@ -93,8 +93,13 @@ fn get_inner_filename(filename: &Path) -> PathBuf { filename.with_file_name(format!("{}{}", stem, new_extension)) } +#[async_trait] impl FileAdapter for DecompressAdapter { - fn adapt(&self, ai: AdaptInfo, detection_reason: &FileMatcher) -> Result { + async fn adapt( + &self, + ai: AdaptInfo, + detection_reason: &FileMatcher, + ) -> Result { Ok(one_file(AdaptInfo { filepath_hint: get_inner_filename(&ai.filepath_hint), is_real_file: false, @@ -137,7 +142,7 @@ mod tests { let filepath = test_data_dir().join("hello.gz"); let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?)); - let r = adapter.adapt(a, &d)?; + let r = adapter.adapt(a, &d).await?; let o = adapted_to_vec(r).await?; assert_eq!(String::from_utf8(o)?, "hello\n"); Ok(()) @@ -150,7 +155,7 @@ mod tests { let filepath = test_data_dir().join("short.pdf.gz"); let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?)); - let r = loop_adapt(&adapter, d, a)?; + let r = loop_adapt(&adapter, d, a).await?; let o = adapted_to_vec(r).await?; assert_eq!( String::from_utf8(o)?, diff --git a/src/adapters/postproc.rs b/src/adapters/postproc.rs index fecbf10..3b42cca 100644 --- a/src/adapters/postproc.rs +++ b/src/adapters/postproc.rs @@ -4,6 +4,7 @@ use anyhow::Result; use async_stream::stream; +use async_trait::async_trait; use bytes::Bytes; use encoding_rs::Encoding; use encoding_rs_io::DecodeReaderBytesBuilder; @@ -44,8 +45,9 @@ impl GetMetadata for PostprocPrefix { &METADATA } } +#[async_trait] impl FileAdapter for PostprocPrefix { - fn adapt<'a>( + async fn adapt( &self, a: super::AdaptInfo, _detection_reason: &crate::matching::FileMatcher, @@ -87,18 +89,12 @@ async fn postproc_encoding( let mut beginning = inp.take(1 << 13); beginning.read_to_end(&mut fourk).await?; + let has_binary = fourk.contains(&0u8); - if fourk.contains(&0u8) { - log::debug!("detected binary"); - let v = "[rga: binary data]"; - return Ok(Box::pin(std::io::Cursor::new(v))); - } let enc = Encoding::for_bom(&fourk); - let inp = std::io::Cursor::new(fourk).chain(beginning.into_inner()); + let inp = Cursor::new(fourk).chain(beginning.into_inner()); match enc { - None => Ok(Box::pin(inp)), - Some((enc, _)) if enc == encoding_rs::UTF_8 => Ok(Box::pin(inp)), - Some(_) => { + Some((enc, _)) if enc != encoding_rs::UTF_8 => { // detected UTF16LE or UTF16BE, convert to UTF8 in separate thread // TODO: parse these options from ripgrep's configuration let encoding = None; // detect bom but usually assume utf8 @@ -120,7 +116,14 @@ async fn postproc_encoding( Ok(oup) }) .await??; - Ok(Box::pin(std::io::Cursor::new(oup))) + Ok(Box::pin(Cursor::new(oup))) + } + _ => { + if has_binary { + log::debug!("detected binary"); + return Ok(Box::pin(Cursor::new("[rga: binary data]"))); + } + Ok(Box::pin(inp)) } } } @@ -169,13 +172,14 @@ impl GetMetadata for PostprocPageBreaks { &METADATA } } +#[async_trait] impl FileAdapter for PostprocPageBreaks { - fn adapt<'a>( + async fn adapt( &self, a: super::AdaptInfo, _detection_reason: &crate::matching::FileMatcher, ) -> Result { - let read = postproc_pagebreaks(postproc_encoding(&a.line_prefix, a.inp)?); + let read = postproc_pagebreaks(postproc_encoding(&a.line_prefix, a.inp).await?); // keep adapt info (filename etc) except replace inp let ai = AdaptInfo { inp: Box::pin(read), @@ -287,7 +291,7 @@ mod tests { let fname = test_data_dir().join("twoblankpages.pdf"); let rd = File::open(&fname).await?; let (a, d) = simple_adapt_info(&fname, Box::pin(rd)); - let res = loop_adapt(&adapter, d, a)?; + let res = loop_adapt(&adapter, d, a).await?; let buf = adapted_to_vec(res).await?; @@ -332,7 +336,8 @@ PREFIX:Page 3: b: &str, ) -> Result<()> { let mut oup = Vec::new(); - let inp = postproc_encoding("", a)?; + let inp = Box::pin(Cursor::new(a)); + let inp = postproc_encoding("", inp).await?; if pagebreaks { postproc_pagebreaks(inp).read_to_end(&mut oup).await?; } else { @@ -346,6 +351,23 @@ PREFIX:Page 3: Ok(()) } + #[tokio::test] + async fn test_utf16() -> Result<()> { + let utf16lebom: &[u8] = &[ + 0xff, 0xfe, 0x68, 0x00, 0x65, 0x00, 0x6c, 0x00, 0x6c, 0x00, 0x6f, 0x00, 0x20, 0x00, + 0x77, 0x00, 0x6f, 0x00, 0x72, 0x00, 0x6c, 0x00, 0x64, 0x00, 0x20, 0x00, 0x3d, 0xd8, + 0xa9, 0xdc, 0x0a, 0x00, + ]; + let utf16bebom: &[u8] = &[ + 0xfe, 0xff, 0x00, 0x68, 0x00, 0x65, 0x00, 0x6c, 0x00, 0x6c, 0x00, 0x6f, 0x00, 0x20, + 0x00, 0x77, 0x00, 0x6f, 0x00, 0x72, 0x00, 0x6c, 0x00, 0x64, 0x00, 0x20, 0xd8, 0x3d, + 0xdc, 0xa9, 0x00, 0x0a, + ]; + test_from_bytes(false, "", utf16lebom, "hello world 💩\n").await?; + test_from_bytes(false, "", utf16bebom, "hello world 💩\n").await?; + Ok(()) + } + #[tokio::test] async fn post1() -> Result<()> { let inp = "What is this\nThis is a test\nFoo"; @@ -367,20 +389,19 @@ PREFIX:Page 3: Ok(()) } - /* - todo: uncomment when fixed + #[tokio::test] - async fn test_binary_content() -> Result<()> { - test_from_strs( - false, - "foo:", - "this is a test \n\n \0 foo", - "foo:[rga: binary data]", - ) - .await?; - test_from_strs(false, "foo:", "\0", "foo:[rga: binary data]").await?; - Ok(()) - }*/ + async fn test_binary_content() -> Result<()> { + test_from_strs( + false, + "foo:", + "this is a test \n\n \0 foo", + "foo:[rga: binary data]", + ) + .await?; + test_from_strs(false, "foo:", "\0", "foo:[rga: binary data]").await?; + Ok(()) + } /*#[test] fn chardet() -> Result<()> { diff --git a/src/adapters/sqlite.rs b/src/adapters/sqlite.rs index 836b330..15541c7 100644 --- a/src/adapters/sqlite.rs +++ b/src/adapters/sqlite.rs @@ -137,7 +137,7 @@ mod test { let adapter: Box = Box::new(SqliteAdapter::default()); let fname = test_data_dir().join("hello.sqlite3"); let (a, d) = simple_fs_adapt_info(&fname).await?; - let res = adapter.adapt(a, &d)?; + let res = adapter.adapt(a, &d).await?; let buf = adapted_to_vec(res).await?; diff --git a/src/adapters/tar.rs b/src/adapters/tar.rs index 637829f..144bd20 100644 --- a/src/adapters/tar.rs +++ b/src/adapters/tar.rs @@ -6,6 +6,7 @@ use crate::{ }; use anyhow::*; use async_stream::stream; +use async_trait::async_trait; use lazy_static::lazy_static; use log::*; use std::path::PathBuf; @@ -45,8 +46,13 @@ impl GetMetadata for TarAdapter { } } +#[async_trait] impl FileAdapter for TarAdapter { - fn adapt(&self, ai: AdaptInfo, _detection_reason: &FileMatcher) -> Result { + async fn adapt( + &self, + ai: AdaptInfo, + _detection_reason: &FileMatcher, + ) -> Result { let AdaptInfo { filepath_hint, inp, @@ -103,7 +109,7 @@ mod tests { let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?)); let adapter = TarAdapter::new(); - let r = loop_adapt(&adapter, d, a).context("adapt")?; + let r = loop_adapt(&adapter, d, a).await.context("adapt")?; let o = adapted_to_vec(r).await.context("adapted_to_vec")?; assert_eq!( String::from_utf8(o).context("parsing utf8")?, diff --git a/src/adapters/writing.rs b/src/adapters/writing.rs index 548711c..433c59b 100644 --- a/src/adapters/writing.rs +++ b/src/adapters/writing.rs @@ -41,11 +41,12 @@ macro_rules! async_writeln { } pub(crate) use async_writeln; +#[async_trait] impl FileAdapter for T where T: WritingFileAdapter, { - fn adapt( + async fn adapt( &self, a: super::AdaptInfo, detection_reason: &crate::matching::FileMatcher, diff --git a/src/adapters/zip.rs b/src/adapters/zip.rs index e6ee1bc..43a1603 100644 --- a/src/adapters/zip.rs +++ b/src/adapters/zip.rs @@ -36,8 +36,13 @@ impl GetMetadata for ZipAdapter { } } +#[async_trait] impl FileAdapter for ZipAdapter { - fn adapt(&self, ai: AdaptInfo, _detection_reason: &FileMatcher) -> Result { + async fn adapt( + &self, + ai: AdaptInfo, + _detection_reason: &FileMatcher, + ) -> Result { // let (s, r) = mpsc::channel(1); let AdaptInfo { inp, @@ -52,8 +57,8 @@ impl FileAdapter for ZipAdapter { if is_real_file { use async_zip::read::fs::ZipFileReader; + let zip = ZipFileReader::new(&filepath_hint).await?; let s = stream! { - let zip = ZipFileReader::new(&filepath_hint).await?; for i in 0..zip.entries().len() { let reader = zip.entry_reader(i).await?; let file = reader.entry(); @@ -182,7 +187,6 @@ impl<'a> AdaptedFilesIter for ZipAdaptIter<'a> { #[cfg(test)] mod test { use async_zip::{write::ZipFileWriter, Compression, ZipEntryBuilder}; - use super::*; use crate::{preproc::loop_adapt, test_utils::*}; @@ -213,7 +217,7 @@ mod test { async fn only_seek_zip_fs() -> Result<()> { let zip = test_data_dir().join("only-seek-zip.zip"); let (a, d) = simple_fs_adapt_info(&zip).await?; - let _v = adapted_to_vec(loop_adapt(&ZipAdapter::new(), d, a)?).await?; + let _v = adapted_to_vec(loop_adapt(&ZipAdapter::new(), d, a).await?).await?; // assert_eq!(String::from_utf8(v)?, ""); Ok(()) @@ -236,7 +240,7 @@ mod test { &PathBuf::from("outer.zip"), Box::pin(std::io::Cursor::new(zipfile)), ); - let buf = adapted_to_vec(loop_adapt(&adapter, d, a)?).await?; + let buf = adapted_to_vec(loop_adapt(&adapter, d, a).await?).await?; assert_eq!( String::from_utf8(buf)?, diff --git a/src/preproc.rs b/src/preproc.rs index 34f4059..1e6c2a7 100644 --- a/src/preproc.rs +++ b/src/preproc.rs @@ -11,11 +11,14 @@ use crate::{ use anyhow::*; use async_compression::tokio::bufread::ZstdDecoder; use async_stream::stream; +// use futures::future::{BoxFuture, FutureExt}; use log::*; use path_clean::PathClean; use postproc::PostprocPrefix; +use std::future::Future; use std::io::Cursor; use std::path::Path; +use std::pin::Pin; use std::sync::Arc; use tokio::io::AsyncBufRead; use tokio::io::AsyncBufReadExt; @@ -185,7 +188,7 @@ async fn adapt_caching( Some(cached) => Ok(Box::pin(ZstdDecoder::new(Cursor::new(cached)))), None => { debug!("cache MISS, running adapter with caching..."); - let inp = loop_adapt(adapter.as_ref(), detection_reason, ai)?; + let inp = loop_adapt(adapter.as_ref(), detection_reason, ai).await?; let inp = concat_read_streams(inp); let inp = async_read_and_write_to_cache( inp, @@ -213,15 +216,25 @@ pub fn loop_adapt( adapter: &dyn FileAdapter, detection_reason: FileMatcher, ai: AdaptInfo, +) -> Pin> + Send + '_>> { + Box::pin(async move { loop_adapt_inner(adapter, detection_reason, ai).await }) +} +pub async fn loop_adapt_inner( + adapter: &dyn FileAdapter, + detection_reason: FileMatcher, + ai: AdaptInfo, ) -> anyhow::Result { let fph = ai.filepath_hint.clone(); - let inp = adapter.adapt(ai, &detection_reason).with_context(|| { - format!( - "adapting {} via {} failed", - fph.to_string_lossy(), - adapter.metadata().name - ) - })?; + let inp = adapter + .adapt(ai, &detection_reason) + .await + .with_context(|| { + format!( + "adapting {} via {} failed", + fph.to_string_lossy(), + adapter.metadata().name + ) + })?; let s = stream! { for await file in inp { match buf_choose_adapter(file?).await? { @@ -243,7 +256,7 @@ pub fn loop_adapt( ai.filepath_hint.to_string_lossy(), &adapter.metadata().name ); - for await ifile in loop_adapt(adapter.as_ref(), detection_reason, ai)? { + for await ifile in loop_adapt(adapter.as_ref(), detection_reason, ai).await? { yield ifile; } }