fix sqlite adapter

This commit is contained in:
phiresky 2022-12-26 22:55:38 +01:00
parent f624a6a29a
commit 82b303bb3d
4 changed files with 112 additions and 86 deletions

View File

@ -3,12 +3,12 @@ pub mod decompress;
// pub mod ffmpeg; // pub mod ffmpeg;
pub mod postproc; pub mod postproc;
use std::sync::Arc; use std::sync::Arc;
// pub mod sqlite; pub mod sqlite;
pub mod tar; pub mod tar;
// pub mod writing; // pub mod writing;
// pub mod zip; // pub mod zip;
use crate::{adapted_iter::AdaptedFilesIterBox, config::RgaConfig, matching::*}; use crate::{adapted_iter::AdaptedFilesIterBox, config::RgaConfig, matching::*};
use anyhow::*; use anyhow::{format_err, Context, Result};
use custom::CustomAdapterConfig; use custom::CustomAdapterConfig;
use custom::BUILTIN_SPAWNING_ADAPTERS; use custom::BUILTIN_SPAWNING_ADAPTERS;
use log::*; use log::*;

View File

@ -2,6 +2,7 @@ use super::*;
use super::{AdaptInfo, AdapterMeta, FileAdapter, GetMetadata}; use super::{AdaptInfo, AdapterMeta, FileAdapter, GetMetadata};
use crate::adapted_iter::one_file; use crate::adapted_iter::one_file;
use crate::join_handle_to_stream;
use crate::{ use crate::{
adapted_iter::AdaptedFilesIterBox, adapted_iter::AdaptedFilesIterBox,
expand::expand_str_ez, expand::expand_str_ez,
@ -19,7 +20,7 @@ use std::process::Stdio;
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio::process::Child; use tokio::process::Child;
use tokio::process::Command; use tokio::process::Command;
use tokio::task::JoinHandle;
use tokio_util::io::StreamReader; use tokio_util::io::StreamReader;
// mostly the same as AdapterMeta + SpawningFileAdapter // mostly the same as AdapterMeta + SpawningFileAdapter
@ -134,11 +135,11 @@ lazy_static! {
/// replace a Command.spawn() error "File not found" with a more readable error /// replace a Command.spawn() error "File not found" with a more readable error
/// to indicate some program is not installed /// to indicate some program is not installed
pub fn map_exe_error(err: std::io::Error, exe_name: &str, help: &str) -> Error { pub fn map_exe_error(err: std::io::Error, exe_name: &str, help: &str) -> anyhow::Error {
use std::io::ErrorKind::*; use std::io::ErrorKind::*;
match err.kind() { match err.kind() {
NotFound => format_err!("Could not find executable \"{}\". {}", exe_name, help), NotFound => format_err!("Could not find executable \"{}\". {}", exe_name, help),
_ => Error::from(err), _ => anyhow::Error::from(err),
} }
} }
@ -156,15 +157,7 @@ fn proc_wait(mut child: Child) -> impl AsyncRead {
}; };
StreamReader::new(s) StreamReader::new(s)
} }
/** returns an AsyncRead that is empty but returns an io error if the given task had an io error or join error */
fn join_handle_to_stream(join: JoinHandle<std::io::Result<()>>) -> impl AsyncRead {
let st = stream! {
join.await.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))??;
yield std::io::Result::Ok(Bytes::copy_from_slice(b""))
};
StreamReader::new(st)
}
pub fn pipe_output( pub fn pipe_output(
_line_prefix: &str, _line_prefix: &str,
mut cmd: Command, mut cmd: Command,

View File

@ -1,11 +1,18 @@
use crate::{adapted_iter::one_file, join_handle_to_stream, to_io_err};
use super::*; use super::*;
use anyhow::Result; use anyhow::Result;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use log::*; use log::*;
use rusqlite::types::ValueRef; use rusqlite::types::ValueRef;
use rusqlite::*; use rusqlite::*;
use std::convert::TryInto; use std::{convert::TryInto, io::Cursor};
use writing::{WritingFileAdapter, WritingFileAdapterTrait}; use tokio::{
io::AsyncReadExt,
sync::mpsc::{self, Sender},
};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::io::StreamReader;
static EXTENSIONS: &[&str] = &["db", "db3", "sqlite", "sqlite3"]; static EXTENSIONS: &[&str] = &["db", "db3", "sqlite", "sqlite3"];
@ -32,11 +39,6 @@ lazy_static! {
#[derive(Default, Clone)] #[derive(Default, Clone)]
pub struct SqliteAdapter; pub struct SqliteAdapter;
impl SqliteAdapter {
pub fn new() -> WritingFileAdapter {
WritingFileAdapter::new(Box::new(SqliteAdapter {}))
}
}
impl GetMetadata for SqliteAdapter { impl GetMetadata for SqliteAdapter {
fn metadata(&self) -> &AdapterMeta { fn metadata(&self) -> &AdapterMeta {
&METADATA &METADATA
@ -49,7 +51,7 @@ fn format_blob(b: ValueRef) -> String {
Null => "NULL".to_owned(), Null => "NULL".to_owned(),
Integer(i) => format!("{}", i), Integer(i) => format!("{}", i),
Real(i) => format!("{}", i), Real(i) => format!("{}", i),
Text(i) => format!("'{}'", String::from_utf8_lossy(i).replace("'", "''")), Text(i) => format!("'{}'", String::from_utf8_lossy(i).replace('\'', "''")),
Blob(b) => format!( Blob(b) => format!(
"[blob {}B]", "[blob {}B]",
size_format::SizeFormatterSI::new( size_format::SizeFormatterSI::new(
@ -60,13 +62,7 @@ fn format_blob(b: ValueRef) -> String {
} }
} }
impl WritingFileAdapterTrait for SqliteAdapter { fn yielder(ai: AdaptInfo, s: Sender<std::io::Result<Cursor<Vec<u8>>>>) -> Result<()> {
fn adapt_write(
&self,
ai: AdaptInfo,
_detection_reason: &FileMatcher,
oup: &mut dyn Write,
) -> Result<()> {
let AdaptInfo { let AdaptInfo {
is_real_file, is_real_file,
filepath_hint, filepath_hint,
@ -76,7 +72,9 @@ impl WritingFileAdapterTrait for SqliteAdapter {
if !is_real_file { if !is_real_file {
// db is in an archive // db is in an archive
// todo: read to memory and then use that blob if size < max // todo: read to memory and then use that blob if size < max
writeln!(oup, "{}[rga: skipping sqlite in archive]", line_prefix,)?; s.blocking_send(Ok(Cursor::new(
format!("{}[rga: skipping sqlite in archive]\n", line_prefix).into_bytes(),
)))?;
return Ok(()); return Ok(());
} }
let inp_fname = filepath_hint; let inp_fname = filepath_hint;
@ -84,7 +82,7 @@ impl WritingFileAdapterTrait for SqliteAdapter {
let conn = Connection::open_with_flags(inp_fname, OpenFlags::SQLITE_OPEN_READ_ONLY)?; let conn = Connection::open_with_flags(inp_fname, OpenFlags::SQLITE_OPEN_READ_ONLY)?;
let tables: Vec<String> = conn let tables: Vec<String> = conn
.prepare("select name from sqlite_master where type='table'")? .prepare("select name from sqlite_master where type='table'")?
.query_map(NO_PARAMS, |r| r.get::<_, String>(0))? .query_map([], |r| r.get::<_, String>(0))?
.filter_map(|e| e.ok()) .filter_map(|e| e.ok())
.collect(); .collect();
debug!("db has {} tables", tables.len()); debug!("db has {} tables", tables.len());
@ -94,51 +92,72 @@ impl WritingFileAdapterTrait for SqliteAdapter {
"select * from {}", "select * from {}",
rusqlite::vtab::escape_double_quote(&table) rusqlite::vtab::escape_double_quote(&table)
))?; ))?;
let mut z = sel.query(NO_PARAMS)?; let col_names: Vec<String> = sel
let col_names: Vec<String> = z
.column_names() .column_names()
.ok_or_else(|| format_err!("no column names"))?
.into_iter() .into_iter()
.map(|e| e.to_owned()) .map(|e| e.to_owned())
.collect(); .collect();
let mut z = sel.query([])?;
// writeln!(oup, "{}: {}", table, cols.join(", "))?; // writeln!(oup, "{}: {}", table, cols.join(", "))?;
// kind of shitty (lossy) output. maybe output real csv or something? // kind of shitty (lossy) output. maybe output real csv or something?
while let Some(row) = z.next()? { while let Some(row) = z.next()? {
writeln!( let str = format!(
oup, "{}{}: {}\n",
"{}{}: {}",
line_prefix, line_prefix,
table, table,
col_names col_names
.iter() .iter()
.enumerate() .enumerate()
.map(|(i, e)| format!("{}={}", e, format_blob(row.get_raw(i)))) .map(|(i, e)| Ok(format!("{}={}", e, format_blob(row.get_ref(i)?))))
.collect::<Vec<String>>() .collect::<Result<Vec<String>>>()?
.join(", ") .join(", ")
)?; );
s.blocking_send(Ok(Cursor::new(str.into_bytes())))?;
} }
} }
Ok(()) Ok(())
} }
impl FileAdapter for SqliteAdapter {
fn adapt(&self, ai: AdaptInfo, _detection_reason: &FileMatcher) -> Result<AdaptedFilesIterBox> {
let (s, r) = mpsc::channel(10);
let filepath_hint = format!("{}.txt", ai.filepath_hint.to_string_lossy());
let config = ai.config.clone();
let line_prefix = ai.line_prefix.clone();
let postprocess = ai.postprocess;
let archive_recursion_depth = ai.archive_recursion_depth;
let joiner = tokio::task::spawn_blocking(|| yielder(ai, s).map_err(to_io_err));
Ok(one_file(AdaptInfo {
is_real_file: false,
filepath_hint: filepath_hint.into(),
archive_recursion_depth: archive_recursion_depth + 1,
config,
inp: Box::pin(
StreamReader::new(ReceiverStream::new(r)).chain(join_handle_to_stream(joiner)),
),
line_prefix,
postprocess,
}))
}
} }
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
use crate::test_utils::*; use crate::test_utils::*;
use std::fs::File; use pretty_assertions::assert_eq;
use tokio::fs::File;
#[test] #[tokio::test]
fn simple() -> Result<()> { async fn simple() -> Result<()> {
let adapter: Box<dyn FileAdapter> = Box::new(SqliteAdapter::new()); let adapter: Box<dyn FileAdapter> = Box::new(SqliteAdapter::default());
let fname = test_data_dir().join("hello.sqlite3"); let fname = test_data_dir().join("hello.sqlite3");
let rd = File::open(&fname)?; let rd = File::open(&fname).await?;
let (a, d) = simple_adapt_info(&fname, Box::new(rd)); let (a, d) = simple_adapt_info(&fname, Box::pin(rd));
let mut res = adapter.adapt(a, &d)?; let res = adapter.adapt(a, &d)?;
let mut buf = Vec::new(); let buf = adapted_to_vec(res).await?;
res.read_to_end(&mut buf)?;
assert_eq!( assert_eq!(
String::from_utf8(buf)?, String::from_utf8(buf)?,

View File

@ -13,8 +13,12 @@ pub mod recurse;
pub mod test_utils; pub mod test_utils;
use anyhow::Context; use anyhow::Context;
use anyhow::Result; use anyhow::Result;
use async_stream::stream;
use directories_next::ProjectDirs; use directories_next::ProjectDirs;
use std::time::Instant; use std::time::Instant;
use tokio::io::AsyncRead;
use tokio::task::JoinHandle;
use tokio_util::io::StreamReader;
pub fn project_dirs() -> Result<ProjectDirs> { pub fn project_dirs() -> Result<ProjectDirs> {
directories_next::ProjectDirs::from("", "", "ripgrep-all") directories_next::ProjectDirs::from("", "", "ripgrep-all")
@ -72,3 +76,13 @@ pub fn to_io_err(e: anyhow::Error) -> std::io::Error {
fn init() { fn init() {
env_logger::init(); env_logger::init();
} }
/** returns an AsyncRead that is empty but returns an io error if the given task had an io error or join error */
pub fn join_handle_to_stream(join: JoinHandle<std::io::Result<()>>) -> impl AsyncRead {
let st = stream! {
join.await.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))??;
yield std::io::Result::Ok((&b""[..]))
};
StreamReader::new(st)
}