readd decompress adapter

This commit is contained in:
phiresky 2022-12-25 20:00:47 +01:00
parent 8e84288a55
commit 9ce11656d1
4 changed files with 137 additions and 73 deletions

110
Cargo.lock generated
View File

@ -40,6 +40,21 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "alloc-no-stdlib"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3"
[[package]]
name = "alloc-stdlib"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece"
dependencies = [
"alloc-no-stdlib",
]
[[package]] [[package]]
name = "ansi_term" name = "ansi_term"
version = "0.12.1" version = "0.12.1"
@ -67,10 +82,18 @@ version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a" checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a"
dependencies = [ dependencies = [
"brotli",
"bytes 0.5.6",
"bzip2",
"flate2",
"futures-core", "futures-core",
"futures-io",
"memchr", "memchr",
"pin-project-lite", "pin-project-lite 0.2.9",
"tokio", "tokio 0.2.25",
"tokio 0.3.7",
"tokio 1.23.0",
"xz2",
"zstd", "zstd",
"zstd-safe", "zstd-safe",
] ]
@ -143,6 +166,27 @@ dependencies = [
"generic-array 0.14.6", "generic-array 0.14.6",
] ]
[[package]]
name = "brotli"
version = "3.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1a0b1dbcc8ae29329621f8d4f0d835787c1c38bb1401979b49d13b0b305ff68"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
"brotli-decompressor",
]
[[package]]
name = "brotli-decompressor"
version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59ad2d4653bf5ca36ae797b1f4bb4dbddb60ce49ca4aed8a2ce4829f60425b80"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
]
[[package]] [[package]]
name = "bytecount" name = "bytecount"
version = "0.6.3" version = "0.6.3"
@ -155,6 +199,12 @@ version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38"
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.3.0" version = "1.3.0"
@ -599,7 +649,7 @@ dependencies = [
"futures-core", "futures-core",
"futures-macro", "futures-macro",
"futures-task", "futures-task",
"pin-project-lite", "pin-project-lite 0.2.9",
"pin-utils", "pin-utils",
"slab", "slab",
] ]
@ -1100,6 +1150,12 @@ dependencies = [
"indexmap", "indexmap",
] ]
[[package]]
name = "pin-project-lite"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "257b64915a082f7811703966789728173279bdebb956b143dbcd23f6f970a777"
[[package]] [[package]]
name = "pin-project-lite" name = "pin-project-lite"
version = "0.2.9" version = "0.2.9"
@ -1242,8 +1298,7 @@ dependencies = [
"async-compression", "async-compression",
"async-stream", "async-stream",
"bincode", "bincode",
"bytes", "bytes 1.3.0",
"bzip2",
"clap 4.0.32", "clap 4.0.32",
"crossbeam", "crossbeam",
"crossbeam-channel", "crossbeam-channel",
@ -1255,7 +1310,6 @@ dependencies = [
"encoding_rs", "encoding_rs",
"encoding_rs_io", "encoding_rs_io",
"env_logger", "env_logger",
"flate2",
"glob", "glob",
"lazy_static", "lazy_static",
"log", "log",
@ -1274,14 +1328,12 @@ dependencies = [
"structopt", "structopt",
"tar", "tar",
"tempfile", "tempfile",
"tokio", "tokio 1.23.0",
"tokio-stream", "tokio-stream",
"tokio-test", "tokio-test",
"tokio-util", "tokio-util",
"tree_magic_mini", "tree_magic_mini",
"xz2",
"zip", "zip",
"zstd",
] ]
[[package]] [[package]]
@ -1663,6 +1715,26 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "0.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6703a273949a90131b290be1fe7b039d0fc884aa1935860dfcbe056f28cd8092"
dependencies = [
"bytes 0.5.6",
"pin-project-lite 0.1.12",
]
[[package]]
name = "tokio"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46409491c9375a693ce7032101970a54f8a2010efb77e13f70788f0d84489e39"
dependencies = [
"autocfg",
"pin-project-lite 0.2.9",
]
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.23.0" version = "1.23.0"
@ -1670,13 +1742,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46" checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"bytes", "bytes 1.3.0",
"libc", "libc",
"memchr", "memchr",
"mio", "mio",
"num_cpus", "num_cpus",
"parking_lot", "parking_lot",
"pin-project-lite", "pin-project-lite 0.2.9",
"signal-hook-registry", "signal-hook-registry",
"socket2", "socket2",
"tokio-macros", "tokio-macros",
@ -1701,8 +1773,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce" checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"pin-project-lite", "pin-project-lite 0.2.9",
"tokio", "tokio 1.23.0",
"tokio-util", "tokio-util",
] ]
@ -1713,9 +1785,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3" checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3"
dependencies = [ dependencies = [
"async-stream", "async-stream",
"bytes", "bytes 1.3.0",
"futures-core", "futures-core",
"tokio", "tokio 1.23.0",
"tokio-stream", "tokio-stream",
] ]
@ -1725,15 +1797,15 @@ version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740"
dependencies = [ dependencies = [
"bytes", "bytes 1.3.0",
"futures-core", "futures-core",
"futures-io", "futures-io",
"futures-sink", "futures-sink",
"futures-util", "futures-util",
"hashbrown", "hashbrown",
"pin-project-lite", "pin-project-lite 0.2.9",
"slab", "slab",
"tokio", "tokio 1.23.0",
"tracing", "tracing",
] ]
@ -1744,7 +1816,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"pin-project-lite", "pin-project-lite 0.2.9",
"tracing-core", "tracing-core",
] ]

View File

@ -17,11 +17,10 @@ version = "0.9.7-alpha.0"
[dependencies] [dependencies]
anyhow = "1.0.32" anyhow = "1.0.32"
async-compression = {version = "0.3.15", features = ["tokio", "zstd"]} async-compression = {version = "0.3.15", features = ["all", "all-algorithms", "tokio"]}
async-stream = "0.3.3" async-stream = "0.3.3"
bincode = "1.3.1" bincode = "1.3.1"
bytes = "1.2.1" bytes = "1.2.1"
bzip2 = "0.4.1"
clap = {version = "4.0.18", features = ["wrap_help"]} clap = {version = "4.0.18", features = ["wrap_help"]}
crossbeam = "0.8.1" crossbeam = "0.8.1"
crossbeam-channel = "0.5.1" crossbeam-channel = "0.5.1"
@ -32,7 +31,6 @@ dyn-clone = "1.0.2"
encoding_rs = "0.8.24" encoding_rs = "0.8.24"
encoding_rs_io = "0.1.7" encoding_rs_io = "0.1.7"
env_logger = "0.9.0" env_logger = "0.9.0"
flate2 = "1.0.14"
glob = "0.3.0" glob = "0.3.0"
lazy_static = "1.4.0" lazy_static = "1.4.0"
log = "0.4.11" log = "0.4.11"
@ -54,9 +52,7 @@ tokio = {version = "1.21.2", features = ["full"]}
tokio-stream = {version = "0.1.11", features = ["io-util", "tokio-util"]} tokio-stream = {version = "0.1.11", features = ["io-util", "tokio-util"]}
tokio-util = {version = "0.7.4", features = ["io", "full"]} tokio-util = {version = "0.7.4", features = ["io", "full"]}
tree_magic = {package = "tree_magic_mini", version = "3.0.0"} tree_magic = {package = "tree_magic_mini", version = "3.0.0"}
xz2 = "0.1.6"
zip = "0.6.3" zip = "0.6.3"
zstd = "0.11.2"
[dev-dependencies] [dev-dependencies]
ctor = "0.1.20" ctor = "0.1.20"

View File

@ -1,5 +1,5 @@
pub mod custom; pub mod custom;
// pub mod decompress; pub mod decompress;
// pub mod ffmpeg; // pub mod ffmpeg;
pub mod postproc; pub mod postproc;
// pub mod pdfpages; // pub mod pdfpages;
@ -117,7 +117,7 @@ pub fn get_all_adapters(custom_adapters: Option<Vec<CustomAdapterConfig>>) -> Ad
Arc::new(PostprocPageBreaks::default()), Arc::new(PostprocPageBreaks::default()),
//Rc::new(ffmpeg::FFmpegAdapter::new()), //Rc::new(ffmpeg::FFmpegAdapter::new()),
// Rc::new(zip::ZipAdapter::new()), // Rc::new(zip::ZipAdapter::new()),
//Rc::new(decompress::DecompressAdapter::new()), Arc::new(decompress::DecompressAdapter::new()),
// Rc::new(tar::TarAdapter::new()), // Rc::new(tar::TarAdapter::new()),
//Rc::new(sqlite::SqliteAdapter::new()), //Rc::new(sqlite::SqliteAdapter::new()),
// Rc::new(pdfpages::PdfPagesAdapter::new()), // Rc::new(pdfpages::PdfPagesAdapter::new()),

View File

@ -1,9 +1,10 @@
use super::*; use super::*;
use crate::preproc::rga_preproc;
use anyhow::Result; use anyhow::Result;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use tokio::io::BufReader;
use std::path::PathBuf; use std::path::{Path, PathBuf};
static EXTENSIONS: &[&str] = &["tgz", "tbz", "tbz2", "gz", "bz2", "xz", "zst"]; static EXTENSIONS: &[&str] = &["tgz", "tbz", "tbz2", "gz", "bz2", "xz", "zst"];
static MIME_TYPES: &[&str] = &[ static MIME_TYPES: &[&str] = &[
@ -49,26 +50,27 @@ impl GetMetadata for DecompressAdapter {
} }
fn decompress_any(reason: &FileMatcher, inp: ReadBox) -> Result<ReadBox> { fn decompress_any(reason: &FileMatcher, inp: ReadBox) -> Result<ReadBox> {
use async_compression::tokio::bufread;
use FastFileMatcher::*; use FastFileMatcher::*;
use FileMatcher::*; use FileMatcher::*;
let gz = |inp: ReadBox| Box::new(flate2::read::MultiGzDecoder::new(inp)); let gz = |inp: ReadBox| Box::pin(bufread::GzipDecoder::new(BufReader::new(inp)));
let bz2 = |inp: ReadBox| Box::new(bzip2::read::BzDecoder::new(inp)); let bz2 = |inp: ReadBox| Box::pin(bufread::BzDecoder::new(BufReader::new(inp)));
let xz = |inp: ReadBox| Box::new(xz2::read::XzDecoder::new_multi_decoder(inp)); let xz = |inp: ReadBox| Box::pin(bufread::XzDecoder::new(BufReader::new(inp)));
let zst = |inp: ReadBox| zstd::stream::read::Decoder::new(inp); // returns result let zst = |inp: ReadBox| Box::pin(bufread::ZstdDecoder::new(BufReader::new(inp)));
Ok(match reason { Ok(match reason {
Fast(FileExtension(ext)) => match ext.as_ref() { Fast(FileExtension(ext)) => match ext.as_ref() {
"tgz" | "gz" => gz(inp), "tgz" | "gz" => gz(inp),
"tbz" | "tbz2" | "bz2" => bz2(inp), "tbz" | "tbz2" | "bz2" => bz2(inp),
"xz" => xz(inp), "xz" => xz(inp),
"zst" => Box::new(zst(inp)?), "zst" => zst(inp),
ext => Err(format_err!("don't know how to decompress {}", ext))?, ext => Err(format_err!("don't know how to decompress {}", ext))?,
}, },
MimeType(mime) => match mime.as_ref() { MimeType(mime) => match mime.as_ref() {
"application/gzip" => gz(inp), "application/gzip" => gz(inp),
"application/x-bzip" => bz2(inp), "application/x-bzip" => bz2(inp),
"application/x-xz" => xz(inp), "application/x-xz" => xz(inp),
"application/zstd" => Box::new(zst(inp)?), "application/zstd" => zst(inp),
mime => Err(format_err!("don't know how to decompress mime {}", mime))?, mime => Err(format_err!("don't know how to decompress mime {}", mime))?,
}, },
}) })
@ -76,13 +78,13 @@ fn decompress_any(reason: &FileMatcher, inp: ReadBox) -> Result<ReadBox> {
fn get_inner_filename(filename: &Path) -> PathBuf { fn get_inner_filename(filename: &Path) -> PathBuf {
let extension = filename let extension = filename
.extension() .extension()
.map(|e| e.to_string_lossy().to_owned()) .map(|e| e.to_string_lossy())
.unwrap_or(Cow::Borrowed("")); .unwrap_or(Cow::Borrowed(""));
let stem = filename let stem = filename
.file_stem() .file_stem()
.expect("no filename given?") .expect("no filename given?")
.to_string_lossy(); .to_string_lossy();
let new_extension = match extension.to_owned().as_ref() { let new_extension = match extension.as_ref() {
"tgz" | "tbz" | "tbz2" => ".tar", "tgz" | "tbz" | "tbz2" => ".tar",
_other => "", _other => "",
}; };
@ -90,33 +92,27 @@ fn get_inner_filename(filename: &Path) -> PathBuf {
} }
impl FileAdapter for DecompressAdapter { impl FileAdapter for DecompressAdapter {
fn adapt(&self, ai: AdaptInfo, detection_reason: &FileMatcher) -> Result<ReadBox> { fn adapt(&self, ai: AdaptInfo, detection_reason: &FileMatcher) -> Result<AdaptedFilesIterBox> {
let AdaptInfo { Ok(Box::pin(tokio_stream::once(AdaptInfo {
filepath_hint, filepath_hint: get_inner_filename(&ai.filepath_hint),
inp,
line_prefix,
archive_recursion_depth,
config,
..
} = ai;
let ai2: AdaptInfo = AdaptInfo {
filepath_hint: get_inner_filename(&filepath_hint),
is_real_file: false, is_real_file: false,
archive_recursion_depth: archive_recursion_depth + 1, archive_recursion_depth: ai.archive_recursion_depth + 1,
inp: decompress_any(detection_reason, inp)?, inp: decompress_any(detection_reason, ai.inp)?,
line_prefix, line_prefix: ai.line_prefix,
config: config.clone(), config: ai.config.clone(),
}; postprocess: ai.postprocess,
rga_preproc(ai2) })))
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::preproc::loop_adapt;
use crate::test_utils::*; use crate::test_utils::*;
use std::fs::File; use pretty_assertions::assert_eq;
use tokio::fs::File;
#[test] #[test]
fn test_inner_filename() { fn test_inner_filename() {
for (a, b) in &[ for (a, b) in &[
@ -132,38 +128,38 @@ mod tests {
} }
} }
#[test] #[tokio::test]
fn gz() -> Result<()> { async fn gz() -> Result<()> {
let adapter = DecompressAdapter; let adapter = DecompressAdapter;
let filepath = test_data_dir().join("hello.gz"); let filepath = test_data_dir().join("hello.gz");
let (a, d) = simple_adapt_info(&filepath, Box::new(File::open(&filepath)?)); let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?));
let mut r = adapter.adapt(a, &d)?; let r = adapter.adapt(a, &d)?;
let mut o = Vec::new(); let o = adapted_to_vec(r).await?;
r.read_to_end(&mut o)?;
assert_eq!(String::from_utf8(o)?, "hello\n"); assert_eq!(String::from_utf8(o)?, "hello\n");
Ok(()) Ok(())
} }
#[test] #[tokio::test]
fn pdf_gz() -> Result<()> { async fn pdf_gz() -> Result<()> {
let adapter = DecompressAdapter; let adapter = DecompressAdapter;
let filepath = test_data_dir().join("short.pdf.gz"); let filepath = test_data_dir().join("short.pdf.gz");
let (a, d) = simple_adapt_info(&filepath, Box::new(File::open(&filepath)?)); let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?));
let mut r = adapter.adapt(a, &d)?; let r = loop_adapt(&adapter, d, a)?;
let mut o = Vec::new(); let o = adapted_to_vec(r).await?;
r.read_to_end(&mut o)?;
assert_eq!( assert_eq!(
String::from_utf8(o)?, String::from_utf8(o)?,
"hello world "PREFIX:Page 1:hello world
this is just a test. PREFIX:Page 1:this is just a test.
PREFIX:Page 1:
1 PREFIX:Page 1:1
PREFIX:Page 1:
\u{c}" PREFIX:Page 1:
PREFIX:Page 2:
"
); );
Ok(()) Ok(())
} }