From 82b303bb3d8e3579293c7afc361ced8930fc842e Mon Sep 17 00:00:00 2001 From: phiresky Date: Mon, 26 Dec 2022 22:55:38 +0100 Subject: [PATCH] fix sqlite adapter --- src/adapters.rs | 4 +- src/adapters/custom.rs | 15 +--- src/adapters/sqlite.rs | 165 +++++++++++++++++++++++------------------ src/lib.rs | 14 ++++ 4 files changed, 112 insertions(+), 86 deletions(-) diff --git a/src/adapters.rs b/src/adapters.rs index 581c674..6887ad8 100644 --- a/src/adapters.rs +++ b/src/adapters.rs @@ -3,12 +3,12 @@ pub mod decompress; // pub mod ffmpeg; pub mod postproc; use std::sync::Arc; -// pub mod sqlite; +pub mod sqlite; pub mod tar; // pub mod writing; // pub mod zip; use crate::{adapted_iter::AdaptedFilesIterBox, config::RgaConfig, matching::*}; -use anyhow::*; +use anyhow::{format_err, Context, Result}; use custom::CustomAdapterConfig; use custom::BUILTIN_SPAWNING_ADAPTERS; use log::*; diff --git a/src/adapters/custom.rs b/src/adapters/custom.rs index 1bc5253..54bdca8 100644 --- a/src/adapters/custom.rs +++ b/src/adapters/custom.rs @@ -2,6 +2,7 @@ use super::*; use super::{AdaptInfo, AdapterMeta, FileAdapter, GetMetadata}; use crate::adapted_iter::one_file; +use crate::join_handle_to_stream; use crate::{ adapted_iter::AdaptedFilesIterBox, expand::expand_str_ez, @@ -19,7 +20,7 @@ use std::process::Stdio; use tokio::io::AsyncReadExt; use tokio::process::Child; use tokio::process::Command; -use tokio::task::JoinHandle; + use tokio_util::io::StreamReader; // mostly the same as AdapterMeta + SpawningFileAdapter @@ -134,11 +135,11 @@ lazy_static! { /// replace a Command.spawn() error "File not found" with a more readable error /// to indicate some program is not installed -pub fn map_exe_error(err: std::io::Error, exe_name: &str, help: &str) -> Error { +pub fn map_exe_error(err: std::io::Error, exe_name: &str, help: &str) -> anyhow::Error { use std::io::ErrorKind::*; match err.kind() { NotFound => format_err!("Could not find executable \"{}\". {}", exe_name, help), - _ => Error::from(err), + _ => anyhow::Error::from(err), } } @@ -156,15 +157,7 @@ fn proc_wait(mut child: Child) -> impl AsyncRead { }; StreamReader::new(s) } -/** returns an AsyncRead that is empty but returns an io error if the given task had an io error or join error */ -fn join_handle_to_stream(join: JoinHandle>) -> impl AsyncRead { - let st = stream! { - join.await.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))??; - yield std::io::Result::Ok(Bytes::copy_from_slice(b"")) - }; - StreamReader::new(st) -} pub fn pipe_output( _line_prefix: &str, mut cmd: Command, diff --git a/src/adapters/sqlite.rs b/src/adapters/sqlite.rs index 2e42441..627b0e4 100644 --- a/src/adapters/sqlite.rs +++ b/src/adapters/sqlite.rs @@ -1,11 +1,18 @@ +use crate::{adapted_iter::one_file, join_handle_to_stream, to_io_err}; + use super::*; use anyhow::Result; use lazy_static::lazy_static; use log::*; use rusqlite::types::ValueRef; use rusqlite::*; -use std::convert::TryInto; -use writing::{WritingFileAdapter, WritingFileAdapterTrait}; +use std::{convert::TryInto, io::Cursor}; +use tokio::{ + io::AsyncReadExt, + sync::mpsc::{self, Sender}, +}; +use tokio_stream::wrappers::ReceiverStream; +use tokio_util::io::StreamReader; static EXTENSIONS: &[&str] = &["db", "db3", "sqlite", "sqlite3"]; @@ -32,11 +39,6 @@ lazy_static! { #[derive(Default, Clone)] pub struct SqliteAdapter; -impl SqliteAdapter { - pub fn new() -> WritingFileAdapter { - WritingFileAdapter::new(Box::new(SqliteAdapter {})) - } -} impl GetMetadata for SqliteAdapter { fn metadata(&self) -> &AdapterMeta { &METADATA @@ -49,7 +51,7 @@ fn format_blob(b: ValueRef) -> String { Null => "NULL".to_owned(), Integer(i) => format!("{}", i), Real(i) => format!("{}", i), - Text(i) => format!("'{}'", String::from_utf8_lossy(i).replace("'", "''")), + Text(i) => format!("'{}'", String::from_utf8_lossy(i).replace('\'', "''")), Blob(b) => format!( "[blob {}B]", size_format::SizeFormatterSI::new( @@ -60,66 +62,83 @@ fn format_blob(b: ValueRef) -> String { } } -impl WritingFileAdapterTrait for SqliteAdapter { - fn adapt_write( - &self, - ai: AdaptInfo, - _detection_reason: &FileMatcher, - oup: &mut dyn Write, - ) -> Result<()> { - let AdaptInfo { - is_real_file, - filepath_hint, - line_prefix, - .. - } = ai; - if !is_real_file { - // db is in an archive - // todo: read to memory and then use that blob if size < max - writeln!(oup, "{}[rga: skipping sqlite in archive]", line_prefix,)?; - return Ok(()); - } - let inp_fname = filepath_hint; +fn yielder(ai: AdaptInfo, s: Sender>>>) -> Result<()> { + let AdaptInfo { + is_real_file, + filepath_hint, + line_prefix, + .. + } = ai; + if !is_real_file { + // db is in an archive + // todo: read to memory and then use that blob if size < max + s.blocking_send(Ok(Cursor::new( + format!("{}[rga: skipping sqlite in archive]\n", line_prefix).into_bytes(), + )))?; + return Ok(()); + } + let inp_fname = filepath_hint; - let conn = Connection::open_with_flags(inp_fname, OpenFlags::SQLITE_OPEN_READ_ONLY)?; - let tables: Vec = conn - .prepare("select name from sqlite_master where type='table'")? - .query_map(NO_PARAMS, |r| r.get::<_, String>(0))? - .filter_map(|e| e.ok()) + let conn = Connection::open_with_flags(inp_fname, OpenFlags::SQLITE_OPEN_READ_ONLY)?; + let tables: Vec = conn + .prepare("select name from sqlite_master where type='table'")? + .query_map([], |r| r.get::<_, String>(0))? + .filter_map(|e| e.ok()) + .collect(); + debug!("db has {} tables", tables.len()); + for table in tables { + // can't use query param at that position + let mut sel = conn.prepare(&format!( + "select * from {}", + rusqlite::vtab::escape_double_quote(&table) + ))?; + let col_names: Vec = sel + .column_names() + .into_iter() + .map(|e| e.to_owned()) .collect(); - debug!("db has {} tables", tables.len()); - for table in tables { - // can't use query param at that position - let mut sel = conn.prepare(&format!( - "select * from {}", - rusqlite::vtab::escape_double_quote(&table) - ))?; - let mut z = sel.query(NO_PARAMS)?; - let col_names: Vec = z - .column_names() - .ok_or_else(|| format_err!("no column names"))? - .into_iter() - .map(|e| e.to_owned()) - .collect(); - // writeln!(oup, "{}: {}", table, cols.join(", "))?; + let mut z = sel.query([])?; + // writeln!(oup, "{}: {}", table, cols.join(", "))?; - // kind of shitty (lossy) output. maybe output real csv or something? - while let Some(row) = z.next()? { - writeln!( - oup, - "{}{}: {}", - line_prefix, - table, - col_names - .iter() - .enumerate() - .map(|(i, e)| format!("{}={}", e, format_blob(row.get_raw(i)))) - .collect::>() - .join(", ") - )?; - } + // kind of shitty (lossy) output. maybe output real csv or something? + while let Some(row) = z.next()? { + let str = format!( + "{}{}: {}\n", + line_prefix, + table, + col_names + .iter() + .enumerate() + .map(|(i, e)| Ok(format!("{}={}", e, format_blob(row.get_ref(i)?)))) + .collect::>>()? + .join(", ") + ); + s.blocking_send(Ok(Cursor::new(str.into_bytes())))?; } - Ok(()) + } + Ok(()) +} + +impl FileAdapter for SqliteAdapter { + fn adapt(&self, ai: AdaptInfo, _detection_reason: &FileMatcher) -> Result { + let (s, r) = mpsc::channel(10); + let filepath_hint = format!("{}.txt", ai.filepath_hint.to_string_lossy()); + let config = ai.config.clone(); + let line_prefix = ai.line_prefix.clone(); + let postprocess = ai.postprocess; + let archive_recursion_depth = ai.archive_recursion_depth; + let joiner = tokio::task::spawn_blocking(|| yielder(ai, s).map_err(to_io_err)); + Ok(one_file(AdaptInfo { + is_real_file: false, + filepath_hint: filepath_hint.into(), + archive_recursion_depth: archive_recursion_depth + 1, + config, + inp: Box::pin( + StreamReader::new(ReceiverStream::new(r)).chain(join_handle_to_stream(joiner)), + ), + line_prefix, + postprocess, + })) } } @@ -127,18 +146,18 @@ impl WritingFileAdapterTrait for SqliteAdapter { mod test { use super::*; use crate::test_utils::*; - use std::fs::File; + use pretty_assertions::assert_eq; + use tokio::fs::File; - #[test] - fn simple() -> Result<()> { - let adapter: Box = Box::new(SqliteAdapter::new()); + #[tokio::test] + async fn simple() -> Result<()> { + let adapter: Box = Box::new(SqliteAdapter::default()); let fname = test_data_dir().join("hello.sqlite3"); - let rd = File::open(&fname)?; - let (a, d) = simple_adapt_info(&fname, Box::new(rd)); - let mut res = adapter.adapt(a, &d)?; + let rd = File::open(&fname).await?; + let (a, d) = simple_adapt_info(&fname, Box::pin(rd)); + let res = adapter.adapt(a, &d)?; - let mut buf = Vec::new(); - res.read_to_end(&mut buf)?; + let buf = adapted_to_vec(res).await?; assert_eq!( String::from_utf8(buf)?, diff --git a/src/lib.rs b/src/lib.rs index 3fd4675..d00a25a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,8 +13,12 @@ pub mod recurse; pub mod test_utils; use anyhow::Context; use anyhow::Result; +use async_stream::stream; use directories_next::ProjectDirs; use std::time::Instant; +use tokio::io::AsyncRead; +use tokio::task::JoinHandle; +use tokio_util::io::StreamReader; pub fn project_dirs() -> Result { directories_next::ProjectDirs::from("", "", "ripgrep-all") @@ -72,3 +76,13 @@ pub fn to_io_err(e: anyhow::Error) -> std::io::Error { fn init() { env_logger::init(); } + +/** returns an AsyncRead that is empty but returns an io error if the given task had an io error or join error */ +pub fn join_handle_to_stream(join: JoinHandle>) -> impl AsyncRead { + let st = stream! { + join.await.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))??; + yield std::io::Result::Ok((&b""[..])) + }; + + StreamReader::new(st) +}