From ddeaf13766723a9827956b4be405702417996c54 Mon Sep 17 00:00:00 2001 From: phiresky Date: Thu, 25 May 2023 15:37:46 +0200 Subject: [PATCH] replace cache with sqlite --- Cargo.lock | 12 ++ Cargo.toml | 2 + src/caching_writer.rs | 10 +- src/preproc.rs | 69 ++++------- src/preproc_cache.rs | 280 ++++++++++++++++++++++++------------------ 5 files changed, 202 insertions(+), 171 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 56089e6..81cd196 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1486,6 +1486,7 @@ dependencies = [ "structopt", "tempfile", "tokio", + "tokio-rusqlite", "tokio-stream", "tokio-tar", "tokio-test", @@ -1852,6 +1853,17 @@ dependencies = [ "syn 2.0.16", ] +[[package]] +name = "tokio-rusqlite" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7aa66395f5ff117faee90c9458232c936405f9227ad902038000b74b3bc1feac" +dependencies = [ + "crossbeam-channel", + "rusqlite", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.14" diff --git a/Cargo.toml b/Cargo.toml index babb346..2c232d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ size_format = "1.0.2" structopt = "0.3.17" tempfile = "3.1.0" tokio = {version = "1.21.2", features = ["full"]} +tokio-rusqlite = "0.4.0" tokio-stream = {version = "0.1.11", features = ["io-util", "tokio-util"]} tokio-tar = { git = "https://github.com/vorot93/tokio-tar", version = "0.3.0" } tokio-util = {version = "0.7.4", features = ["io", "full"]} @@ -60,4 +61,5 @@ tree_magic = {package = "tree_magic_mini", version = "3.0.0"} async-recursion = "1.0.0" ctor = "0.2.0" pretty_assertions = "1.3.0" +tempfile = "3.1.0" tokio-test = "0.4.2" diff --git a/src/caching_writer.rs b/src/caching_writer.rs index 9d2cbc1..1f2481d 100644 --- a/src/caching_writer.rs +++ b/src/caching_writer.rs @@ -1,17 +1,17 @@ -use std::pin::Pin; +use std::{future::Future, pin::Pin}; use anyhow::Result; use async_compression::tokio::write::ZstdEncoder; use async_stream::stream; +use crate::to_io_err; use log::*; use tokio::io::{AsyncRead, AsyncWriteExt}; use tokio_stream::StreamExt; use tokio_util::io::{ReaderStream, StreamReader}; -use crate::to_io_err; - -type FinishHandler = dyn FnOnce((u64, Option>)) -> Result<()> + Send; +type FinishHandler = + dyn FnOnce((u64, Option>)) -> Pin> + Send>> + Send; /** * wrap a AsyncRead so that it is passthrough, * but also the written data is compressed and written into a buffer, @@ -64,7 +64,7 @@ pub fn async_read_and_write_to_cache<'a>( }; // EOF, finish! - on_finish(finish) + on_finish(finish).await .map_err(to_io_err)?; }; diff --git a/src/preproc.rs b/src/preproc.rs index 1e6c2a7..e65e741 100644 --- a/src/preproc.rs +++ b/src/preproc.rs @@ -3,9 +3,10 @@ use crate::adapters::*; use crate::caching_writer::async_read_and_write_to_cache; use crate::config::RgaConfig; use crate::matching::*; +use crate::preproc_cache::CacheKey; use crate::recurse::concat_read_streams; use crate::{ - preproc_cache::{LmdbCache, PreprocCache}, + preproc_cache::{open_cache_db, PreprocCache}, print_bytes, }; use anyhow::*; @@ -13,7 +14,6 @@ use async_compression::tokio::bufread::ZstdDecoder; use async_stream::stream; // use futures::future::{BoxFuture, FutureExt}; use log::*; -use path_clean::PathClean; use postproc::PostprocPrefix; use std::future::Future; use std::io::Cursor; @@ -24,7 +24,7 @@ use tokio::io::AsyncBufRead; use tokio::io::AsyncBufReadExt; use tokio::io::BufReader; -type ActiveAdapters = Vec>; +pub type ActiveAdapters = Vec>; async fn choose_adapter( config: &RgaConfig, @@ -123,36 +123,6 @@ pub async fn rga_preproc(ai: AdaptInfo) -> Result { .with_context(|| format!("run_adapter({})", &path_hint_copy.to_string_lossy())) } -fn compute_cache_key( - filepath_hint: &Path, - adapter: &dyn FileAdapter, - active_adapters: ActiveAdapters, -) -> Result> { - let clean_path = filepath_hint.to_owned().clean(); - let meta = std::fs::metadata(filepath_hint) - .with_context(|| format!("reading metadata for {}", filepath_hint.to_string_lossy()))?; - let modified = meta.modified().expect("weird OS that can't into mtime"); - - if adapter.metadata().recurses { - let active_adapters_cache_key = active_adapters - .iter() - .map(|a| (a.metadata().name.clone(), a.metadata().version)) - .collect::>(); - let key = (active_adapters_cache_key, clean_path, modified); - debug!("Cache key (with recursion): {:?}", key); - bincode::serialize(&key).context("could not serialize path") - } else { - let key = ( - adapter.metadata().name.clone(), - adapter.metadata().version, - clean_path, - modified, - ); - debug!("Cache key (no recursion): {:?}", key); - bincode::serialize(&key).context("could not serialize path") - } -} - async fn adapt_caching( ai: AdaptInfo, adapter: Arc, @@ -169,21 +139,19 @@ async fn adapt_caching( ai.filepath_hint.to_string_lossy(), &meta.name ); - let db_name = format!("{}.v{}", meta.name, meta.version); let cache_compression_level = ai.config.cache.compression_level; let cache_max_blob_len = ai.config.cache.max_blob_len; - let cache = if ai.is_real_file { - LmdbCache::open(&ai.config.cache)? + let cache = if ai.is_real_file && !ai.config.cache.disabled { + Some(open_cache_db(Path::new(&ai.config.cache.path.0)).await?) } else { None }; let mut cache = cache.context("No cache?")?; - let cache_key: Vec = - compute_cache_key(&ai.filepath_hint, adapter.as_ref(), active_adapters)?; + let cache_key = CacheKey::new(&ai.filepath_hint, adapter.as_ref(), &active_adapters)?; // let dbg_ctx = format!("adapter {}", &adapter.metadata().name); - let cached = cache.get(&db_name, &cache_key)?; + let cached = cache.get(&cache_key).await?; match cached { Some(cached) => Ok(Box::pin(ZstdDecoder::new(Cursor::new(cached)))), None => { @@ -195,15 +163,20 @@ async fn adapt_caching( cache_max_blob_len.0, cache_compression_level.0, Box::new(move |(uncompressed_size, compressed)| { - debug!( - "uncompressed output: {}", - print_bytes(uncompressed_size as f64) - ); - if let Some(cached) = compressed { - debug!("compressed output: {}", print_bytes(cached.len() as f64)); - cache.set(&db_name, &cache_key, &cached)? - } - Ok(()) + Box::pin(async move { + debug!( + "uncompressed output: {}", + print_bytes(uncompressed_size as f64) + ); + if let Some(cached) = compressed { + debug!("compressed output: {}", print_bytes(cached.len() as f64)); + cache + .set(&cache_key, cached) + .await + .context("writing to cache")? + } + Ok(()) + }) }), )?; diff --git a/src/preproc_cache.rs b/src/preproc_cache.rs index 764213d..2f12c16 100644 --- a/src/preproc_cache.rs +++ b/src/preproc_cache.rs @@ -1,135 +1,179 @@ -use crate::{config::CacheConfig, print_bytes, print_dur}; -use anyhow::{format_err, Context, Result}; +use crate::{adapters::FileAdapter, preproc::ActiveAdapters}; +use anyhow::{Context, Result}; use log::*; -use rkv::backend::{BackendEnvironmentBuilder, LmdbEnvironment}; -use std::{fmt::Display, path::Path, time::Instant}; +use path_clean::PathClean; +use rusqlite::{named_params, OptionalExtension}; +use std::{path::Path, time::UNIX_EPOCH}; +use tokio_rusqlite::Connection; -pub trait PreprocCache: Send + Sync { - /*/// gets cache at specified key. - /// if cache hit, return the resulting data - /// else, run the given lambda, and store its result in the cache if present - fn get_or_run<'a>( - &mut self, - db_name: &str, - key: &[u8], - debug_name: &str, - runner: Box Result>> + 'a>, - ) -> Result>>;*/ - - fn get(&self, db_name: &str, key: &[u8]) -> Result>>; - fn set(&mut self, db_name: &str, key: &[u8], value: &[u8]) -> Result<()>; +#[derive(Clone)] +pub struct CacheKey { + adapter: String, + adapter_version: i32, + active_adapters: String, + file_path: String, + file_mtime_unix_ms: i64, } - -/// opens a LMDB cache -fn open_cache_db( - path: &Path, -) -> Result>>> { - std::fs::create_dir_all(path)?; - // use rkv::backend::LmdbEnvironmentFlags; - - rkv::Manager::::singleton() - .write() - .map_err(|_| format_err!("could not write cache db manager"))? - .get_or_create(path, |p| { - let mut builder = rkv::Rkv::environment_builder::(); - builder - .set_flags(rkv::EnvironmentFlags::NO_SYNC) - .set_flags(rkv::EnvironmentFlags::WRITE_MAP) // not durable cuz it's a cache - // i'm not sure why NO_TLS is needed. otherwise LMDB transactions (open readers) will keep piling up until it fails with - // LmdbError(ReadersFull). Those "open readers" stay even after the corresponding processes exit. - // hope setting this doesn't break integrity - .set_flags(rkv::EnvironmentFlags::NO_TLS) - // sometimes, this seems to cause the data.mdb file to appear as 2GB in size (with holes), but sometimes not? - .set_map_size(2 * 1024 * 1024 * 1024) - .set_max_dbs(100) - .set_max_readers(128); - rkv::Rkv::from_builder(p, builder) +impl CacheKey { + pub fn new( + filepath_hint: &Path, + adapter: &dyn FileAdapter, + active_adapters: &ActiveAdapters, + ) -> Result { + let meta = std::fs::metadata(filepath_hint) + .with_context(|| format!("reading metadata for {}", filepath_hint.to_string_lossy()))?; + let modified = meta.modified().expect("weird OS that can't into mtime"); + let file_mtime_unix_ms = modified.duration_since(UNIX_EPOCH)?.as_millis() as i64; + let active_adapters = if adapter.metadata().recurses { + serde_json::to_string( + &active_adapters + .iter() + .map(|a| format!("{}.v{}", a.metadata().name, a.metadata().version)) + .collect::>(), + )? + } else { + "null".to_string() + }; + Ok(CacheKey { + adapter: adapter.metadata().name.clone(), + adapter_version: adapter.metadata().version, + file_path: filepath_hint.clean().to_string_lossy().to_string(), + file_mtime_unix_ms, + active_adapters, }) - .map_err(|e| format_err!("could not get/create cache db: {}", e)) -} - -pub struct LmdbCache { - db_arc: std::sync::Arc>>, -} - -impl LmdbCache { - pub fn open(config: &CacheConfig) -> Result> { - if config.disabled { - return Ok(None); - } - let path = Path::new(&config.path.0); - Ok(Some(LmdbCache { - db_arc: open_cache_db(path)?, - })) } } -#[derive(Debug)] -struct RkvErrWrap(rkv::StoreError); -impl Display for RkvErrWrap { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) +#[async_trait::async_trait] +pub trait PreprocCache { + async fn get(&self, key: &CacheKey) -> Result>>; + async fn set(&mut self, key: &CacheKey, value: Vec) -> Result<()>; +} + +async fn pragmas(db: &Connection) -> Result<()> { + // https://phiresky.github.io/blog/2020/sqlite-performance-tuning/ + //let want_page_size = 32768; + //db.execute(&format!("pragma page_size = {};", want_page_size)) + // .context("setup pragma 1")?; + db.call(|db| { + db.execute_batch( + " + pragma journal_mode = WAL; + pragma foreign_keys = on; + pragma temp_store = memory; + pragma synchronous = off; -- integrity isn't very important here + pragma mmap_size = 30000000000; + + pragma application_id = 924716026; + pragma user_version = 2; -- todo: on upgrade clear db if version is unexpected + + create table if not exists preproc_cache ( + adapter text not null, + adapter_version integer not null, + created_unix_ms integer not null default (unixepoch() * 1000), + active_adapters text not null, -- 'null' if adapter cannot recurse + file_path text not null, + file_mtime_unix_ms integer not null, + text_content_zstd blob not null + ) strict; + + create unique index if not exists preproc_cache_idx on preproc_cache (adapter, adapter_version, file_path, active_adapters); + ", + ) + }) + .await?; + /*let jm: String = db + .call(|db| db.pragma_query_value(None, "journal_mode", |r| r.get(0))?) + .await?; + if &jm != "wal" { + anyhow::bail!("journal mode is not wal"); + }*/ + Ok(()) +} + +struct SqliteCache { + db: Connection, +} +impl SqliteCache { + async fn new(path: &Path) -> Result { + let db = Connection::open(path.join("cache.sqlite3")).await?; + pragmas(&db).await?; + + Ok(SqliteCache { db }) } } -impl std::error::Error for RkvErrWrap {} -impl PreprocCache for LmdbCache { - fn get(&self, db_name: &str, key: &[u8]) -> Result>> { - let start = Instant::now(); - let db_env = self - .db_arc - .read() - .map_err(|_| anyhow::anyhow!("Could not open lock, some lock writer panicked"))?; - let db = db_env - .open_single(db_name, rkv::store::Options::create()) - .map_err(RkvErrWrap) - .context("could not open cache db store")?; - - let reader = db_env.read().expect("could not get reader"); - let cached = db - .get(&reader, key) - .map_err(RkvErrWrap) - .context("could not read from db")?; - - match cached { - Some(rkv::Value::Blob(cached)) => { - debug!( - "cache HIT, reading {} (compressed) from cache", - print_bytes(cached.len() as f64) - ); - debug!("reading from cache took {}", print_dur(start)); - Ok(Some(Vec::from(cached))) - } - Some(_) => Err(format_err!("Integrity: value not blob"))?, - None => Ok(None), - } +#[async_trait::async_trait] +impl PreprocCache for SqliteCache { + async fn get(&self, key: &CacheKey) -> Result>> { + let key = (*key).clone(); // todo: without cloning + Ok(self + .db + .call(move |db| { + db.query_row( + "select text_content_zstd from preproc_cache where + adapter = :adapter + and adapter_version = :adapter_version + and active_adapters = :active_adapters + and file_path = :file_path + and file_mtime_unix_ms = :file_mtime_unix_ms + ", + named_params! { + ":adapter": &key.adapter, + ":adapter_version": &key.adapter_version, + ":active_adapters": &key.active_adapters, + ":file_path": &key.file_path, + ":file_mtime_unix_ms": &key.file_mtime_unix_ms + }, + |r| r.get::<_, Vec>(0), + ) + .optional() + }) + .await + .context("reading from cache")?) } - fn set(&mut self, db_name: &str, key: &[u8], got: &[u8]) -> Result<()> { - let start = Instant::now(); - debug!("writing {} to cache", print_bytes(got.len() as f64)); - let db_env = self - .db_arc - .read() - .map_err(|_| anyhow::anyhow!("Could not open lock, some lock writer panicked"))?; - let db = db_env - .open_single(db_name, rkv::store::Options::create()) - .map_err(RkvErrWrap) - .context("could not open cache db store")?; + async fn set(&mut self, key: &CacheKey, value: Vec) -> Result<()> { + let key = (*key).clone(); // todo: without cloning + Ok(self + .db + .call(move |db| { + db.execute( + "insert into preproc_cache (adapter, adapter_version, active_adapters, file_path, file_mtime_unix_ms, text_content_zstd) values + (:adapter, :adapter_version, :active_adapters, :file_path, :file_mtime_unix_ms, :text_content_zstd) + on conflict (adapter, adapter_version, active_adapters, file_path) do update set + file_mtime_unix_ms = :file_mtime_unix_ms, + created_unix_ms = unixepoch() * 1000, + text_content_zstd = :text_content_zstd", + named_params! { + ":adapter": &key.adapter, + ":adapter_version": &key.adapter_version, + ":active_adapters": &key.active_adapters, + ":file_path": &key.file_path, + ":file_mtime_unix_ms": &key.file_mtime_unix_ms, + ":text_content_zstd": value + } + ).map(|_| ()) + }) + .await?) + } +} +/// opens a default cache +pub async fn open_cache_db(path: &Path) -> Result { + std::fs::create_dir_all(path)?; + SqliteCache::new(path).await +} - let mut writer = db_env - .write() - .map_err(RkvErrWrap) - .with_context(|| format_err!("could not open write handle to cache"))?; +#[cfg(test)] +mod test { - db.put(&mut writer, key, &rkv::Value::Blob(got)) - .map_err(RkvErrWrap) - .with_context(|| format_err!("could not write to cache"))?; - writer - .commit() - .map_err(RkvErrWrap) - .with_context(|| "could not write cache".to_string())?; - debug!("writing to cache took {}", print_dur(start)); + use crate::preproc_cache::*; + + #[tokio::test] + async fn test_read_write() -> anyhow::Result<()> { + let path = tempfile::tempdir()?; + let db = open_cache_db(&path.path().join("foo.sqlite3")).await?; + // db.set(); Ok(()) } }