replace cache with sqlite

This commit is contained in:
phiresky 2023-05-25 15:37:46 +02:00
parent da4a4ce135
commit ddeaf13766
5 changed files with 202 additions and 171 deletions

12
Cargo.lock generated
View File

@ -1486,6 +1486,7 @@ dependencies = [
"structopt", "structopt",
"tempfile", "tempfile",
"tokio", "tokio",
"tokio-rusqlite",
"tokio-stream", "tokio-stream",
"tokio-tar", "tokio-tar",
"tokio-test", "tokio-test",
@ -1852,6 +1853,17 @@ dependencies = [
"syn 2.0.16", "syn 2.0.16",
] ]
[[package]]
name = "tokio-rusqlite"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7aa66395f5ff117faee90c9458232c936405f9227ad902038000b74b3bc1feac"
dependencies = [
"crossbeam-channel",
"rusqlite",
"tokio",
]
[[package]] [[package]]
name = "tokio-stream" name = "tokio-stream"
version = "0.1.14" version = "0.1.14"

View File

@ -51,6 +51,7 @@ size_format = "1.0.2"
structopt = "0.3.17" structopt = "0.3.17"
tempfile = "3.1.0" tempfile = "3.1.0"
tokio = {version = "1.21.2", features = ["full"]} tokio = {version = "1.21.2", features = ["full"]}
tokio-rusqlite = "0.4.0"
tokio-stream = {version = "0.1.11", features = ["io-util", "tokio-util"]} tokio-stream = {version = "0.1.11", features = ["io-util", "tokio-util"]}
tokio-tar = { git = "https://github.com/vorot93/tokio-tar", version = "0.3.0" } tokio-tar = { git = "https://github.com/vorot93/tokio-tar", version = "0.3.0" }
tokio-util = {version = "0.7.4", features = ["io", "full"]} tokio-util = {version = "0.7.4", features = ["io", "full"]}
@ -60,4 +61,5 @@ tree_magic = {package = "tree_magic_mini", version = "3.0.0"}
async-recursion = "1.0.0" async-recursion = "1.0.0"
ctor = "0.2.0" ctor = "0.2.0"
pretty_assertions = "1.3.0" pretty_assertions = "1.3.0"
tempfile = "3.1.0"
tokio-test = "0.4.2" tokio-test = "0.4.2"

View File

@ -1,17 +1,17 @@
use std::pin::Pin; use std::{future::Future, pin::Pin};
use anyhow::Result; use anyhow::Result;
use async_compression::tokio::write::ZstdEncoder; use async_compression::tokio::write::ZstdEncoder;
use async_stream::stream; use async_stream::stream;
use crate::to_io_err;
use log::*; use log::*;
use tokio::io::{AsyncRead, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncWriteExt};
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tokio_util::io::{ReaderStream, StreamReader}; use tokio_util::io::{ReaderStream, StreamReader};
use crate::to_io_err; type FinishHandler =
dyn FnOnce((u64, Option<Vec<u8>>)) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send;
type FinishHandler = dyn FnOnce((u64, Option<Vec<u8>>)) -> Result<()> + Send;
/** /**
* wrap a AsyncRead so that it is passthrough, * wrap a AsyncRead so that it is passthrough,
* but also the written data is compressed and written into a buffer, * but also the written data is compressed and written into a buffer,
@ -64,7 +64,7 @@ pub fn async_read_and_write_to_cache<'a>(
}; };
// EOF, finish! // EOF, finish!
on_finish(finish) on_finish(finish).await
.map_err(to_io_err)?; .map_err(to_io_err)?;
}; };

View File

@ -3,9 +3,10 @@ use crate::adapters::*;
use crate::caching_writer::async_read_and_write_to_cache; use crate::caching_writer::async_read_and_write_to_cache;
use crate::config::RgaConfig; use crate::config::RgaConfig;
use crate::matching::*; use crate::matching::*;
use crate::preproc_cache::CacheKey;
use crate::recurse::concat_read_streams; use crate::recurse::concat_read_streams;
use crate::{ use crate::{
preproc_cache::{LmdbCache, PreprocCache}, preproc_cache::{open_cache_db, PreprocCache},
print_bytes, print_bytes,
}; };
use anyhow::*; use anyhow::*;
@ -13,7 +14,6 @@ use async_compression::tokio::bufread::ZstdDecoder;
use async_stream::stream; use async_stream::stream;
// use futures::future::{BoxFuture, FutureExt}; // use futures::future::{BoxFuture, FutureExt};
use log::*; use log::*;
use path_clean::PathClean;
use postproc::PostprocPrefix; use postproc::PostprocPrefix;
use std::future::Future; use std::future::Future;
use std::io::Cursor; use std::io::Cursor;
@ -24,7 +24,7 @@ use tokio::io::AsyncBufRead;
use tokio::io::AsyncBufReadExt; use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader; use tokio::io::BufReader;
type ActiveAdapters = Vec<Arc<dyn FileAdapter>>; pub type ActiveAdapters = Vec<Arc<dyn FileAdapter>>;
async fn choose_adapter( async fn choose_adapter(
config: &RgaConfig, config: &RgaConfig,
@ -123,36 +123,6 @@ pub async fn rga_preproc(ai: AdaptInfo) -> Result<ReadBox> {
.with_context(|| format!("run_adapter({})", &path_hint_copy.to_string_lossy())) .with_context(|| format!("run_adapter({})", &path_hint_copy.to_string_lossy()))
} }
fn compute_cache_key(
filepath_hint: &Path,
adapter: &dyn FileAdapter,
active_adapters: ActiveAdapters,
) -> Result<Vec<u8>> {
let clean_path = filepath_hint.to_owned().clean();
let meta = std::fs::metadata(filepath_hint)
.with_context(|| format!("reading metadata for {}", filepath_hint.to_string_lossy()))?;
let modified = meta.modified().expect("weird OS that can't into mtime");
if adapter.metadata().recurses {
let active_adapters_cache_key = active_adapters
.iter()
.map(|a| (a.metadata().name.clone(), a.metadata().version))
.collect::<Vec<_>>();
let key = (active_adapters_cache_key, clean_path, modified);
debug!("Cache key (with recursion): {:?}", key);
bincode::serialize(&key).context("could not serialize path")
} else {
let key = (
adapter.metadata().name.clone(),
adapter.metadata().version,
clean_path,
modified,
);
debug!("Cache key (no recursion): {:?}", key);
bincode::serialize(&key).context("could not serialize path")
}
}
async fn adapt_caching( async fn adapt_caching(
ai: AdaptInfo, ai: AdaptInfo,
adapter: Arc<dyn FileAdapter>, adapter: Arc<dyn FileAdapter>,
@ -169,21 +139,19 @@ async fn adapt_caching(
ai.filepath_hint.to_string_lossy(), ai.filepath_hint.to_string_lossy(),
&meta.name &meta.name
); );
let db_name = format!("{}.v{}", meta.name, meta.version);
let cache_compression_level = ai.config.cache.compression_level; let cache_compression_level = ai.config.cache.compression_level;
let cache_max_blob_len = ai.config.cache.max_blob_len; let cache_max_blob_len = ai.config.cache.max_blob_len;
let cache = if ai.is_real_file { let cache = if ai.is_real_file && !ai.config.cache.disabled {
LmdbCache::open(&ai.config.cache)? Some(open_cache_db(Path::new(&ai.config.cache.path.0)).await?)
} else { } else {
None None
}; };
let mut cache = cache.context("No cache?")?; let mut cache = cache.context("No cache?")?;
let cache_key: Vec<u8> = let cache_key = CacheKey::new(&ai.filepath_hint, adapter.as_ref(), &active_adapters)?;
compute_cache_key(&ai.filepath_hint, adapter.as_ref(), active_adapters)?;
// let dbg_ctx = format!("adapter {}", &adapter.metadata().name); // let dbg_ctx = format!("adapter {}", &adapter.metadata().name);
let cached = cache.get(&db_name, &cache_key)?; let cached = cache.get(&cache_key).await?;
match cached { match cached {
Some(cached) => Ok(Box::pin(ZstdDecoder::new(Cursor::new(cached)))), Some(cached) => Ok(Box::pin(ZstdDecoder::new(Cursor::new(cached)))),
None => { None => {
@ -195,15 +163,20 @@ async fn adapt_caching(
cache_max_blob_len.0, cache_max_blob_len.0,
cache_compression_level.0, cache_compression_level.0,
Box::new(move |(uncompressed_size, compressed)| { Box::new(move |(uncompressed_size, compressed)| {
debug!( Box::pin(async move {
"uncompressed output: {}", debug!(
print_bytes(uncompressed_size as f64) "uncompressed output: {}",
); print_bytes(uncompressed_size as f64)
if let Some(cached) = compressed { );
debug!("compressed output: {}", print_bytes(cached.len() as f64)); if let Some(cached) = compressed {
cache.set(&db_name, &cache_key, &cached)? debug!("compressed output: {}", print_bytes(cached.len() as f64));
} cache
Ok(()) .set(&cache_key, cached)
.await
.context("writing to cache")?
}
Ok(())
})
}), }),
)?; )?;

View File

@ -1,135 +1,179 @@
use crate::{config::CacheConfig, print_bytes, print_dur}; use crate::{adapters::FileAdapter, preproc::ActiveAdapters};
use anyhow::{format_err, Context, Result}; use anyhow::{Context, Result};
use log::*; use log::*;
use rkv::backend::{BackendEnvironmentBuilder, LmdbEnvironment}; use path_clean::PathClean;
use std::{fmt::Display, path::Path, time::Instant}; use rusqlite::{named_params, OptionalExtension};
use std::{path::Path, time::UNIX_EPOCH};
use tokio_rusqlite::Connection;
pub trait PreprocCache: Send + Sync { #[derive(Clone)]
/*/// gets cache at specified key. pub struct CacheKey {
/// if cache hit, return the resulting data adapter: String,
/// else, run the given lambda, and store its result in the cache if present adapter_version: i32,
fn get_or_run<'a>( active_adapters: String,
&mut self, file_path: String,
db_name: &str, file_mtime_unix_ms: i64,
key: &[u8],
debug_name: &str,
runner: Box<dyn FnOnce() -> Result<Option<Vec<u8>>> + 'a>,
) -> Result<Option<Vec<u8>>>;*/
fn get(&self, db_name: &str, key: &[u8]) -> Result<Option<Vec<u8>>>;
fn set(&mut self, db_name: &str, key: &[u8], value: &[u8]) -> Result<()>;
} }
impl CacheKey {
/// opens a LMDB cache pub fn new(
fn open_cache_db( filepath_hint: &Path,
path: &Path, adapter: &dyn FileAdapter,
) -> Result<std::sync::Arc<std::sync::RwLock<rkv::Rkv<LmdbEnvironment>>>> { active_adapters: &ActiveAdapters,
std::fs::create_dir_all(path)?; ) -> Result<CacheKey> {
// use rkv::backend::LmdbEnvironmentFlags; let meta = std::fs::metadata(filepath_hint)
.with_context(|| format!("reading metadata for {}", filepath_hint.to_string_lossy()))?;
rkv::Manager::<LmdbEnvironment>::singleton() let modified = meta.modified().expect("weird OS that can't into mtime");
.write() let file_mtime_unix_ms = modified.duration_since(UNIX_EPOCH)?.as_millis() as i64;
.map_err(|_| format_err!("could not write cache db manager"))? let active_adapters = if adapter.metadata().recurses {
.get_or_create(path, |p| { serde_json::to_string(
let mut builder = rkv::Rkv::environment_builder::<rkv::backend::Lmdb>(); &active_adapters
builder .iter()
.set_flags(rkv::EnvironmentFlags::NO_SYNC) .map(|a| format!("{}.v{}", a.metadata().name, a.metadata().version))
.set_flags(rkv::EnvironmentFlags::WRITE_MAP) // not durable cuz it's a cache .collect::<Vec<_>>(),
// i'm not sure why NO_TLS is needed. otherwise LMDB transactions (open readers) will keep piling up until it fails with )?
// LmdbError(ReadersFull). Those "open readers" stay even after the corresponding processes exit. } else {
// hope setting this doesn't break integrity "null".to_string()
.set_flags(rkv::EnvironmentFlags::NO_TLS) };
// sometimes, this seems to cause the data.mdb file to appear as 2GB in size (with holes), but sometimes not? Ok(CacheKey {
.set_map_size(2 * 1024 * 1024 * 1024) adapter: adapter.metadata().name.clone(),
.set_max_dbs(100) adapter_version: adapter.metadata().version,
.set_max_readers(128); file_path: filepath_hint.clean().to_string_lossy().to_string(),
rkv::Rkv::from_builder(p, builder) file_mtime_unix_ms,
active_adapters,
}) })
.map_err(|e| format_err!("could not get/create cache db: {}", e))
}
pub struct LmdbCache {
db_arc: std::sync::Arc<std::sync::RwLock<rkv::Rkv<LmdbEnvironment>>>,
}
impl LmdbCache {
pub fn open(config: &CacheConfig) -> Result<Option<LmdbCache>> {
if config.disabled {
return Ok(None);
}
let path = Path::new(&config.path.0);
Ok(Some(LmdbCache {
db_arc: open_cache_db(path)?,
}))
} }
} }
#[derive(Debug)] #[async_trait::async_trait]
struct RkvErrWrap(rkv::StoreError); pub trait PreprocCache {
impl Display for RkvErrWrap { async fn get(&self, key: &CacheKey) -> Result<Option<Vec<u8>>>;
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { async fn set(&mut self, key: &CacheKey, value: Vec<u8>) -> Result<()>;
self.0.fmt(f) }
async fn pragmas(db: &Connection) -> Result<()> {
// https://phiresky.github.io/blog/2020/sqlite-performance-tuning/
//let want_page_size = 32768;
//db.execute(&format!("pragma page_size = {};", want_page_size))
// .context("setup pragma 1")?;
db.call(|db| {
db.execute_batch(
"
pragma journal_mode = WAL;
pragma foreign_keys = on;
pragma temp_store = memory;
pragma synchronous = off; -- integrity isn't very important here
pragma mmap_size = 30000000000;
pragma application_id = 924716026;
pragma user_version = 2; -- todo: on upgrade clear db if version is unexpected
create table if not exists preproc_cache (
adapter text not null,
adapter_version integer not null,
created_unix_ms integer not null default (unixepoch() * 1000),
active_adapters text not null, -- 'null' if adapter cannot recurse
file_path text not null,
file_mtime_unix_ms integer not null,
text_content_zstd blob not null
) strict;
create unique index if not exists preproc_cache_idx on preproc_cache (adapter, adapter_version, file_path, active_adapters);
",
)
})
.await?;
/*let jm: String = db
.call(|db| db.pragma_query_value(None, "journal_mode", |r| r.get(0))?)
.await?;
if &jm != "wal" {
anyhow::bail!("journal mode is not wal");
}*/
Ok(())
}
struct SqliteCache {
db: Connection,
}
impl SqliteCache {
async fn new(path: &Path) -> Result<SqliteCache> {
let db = Connection::open(path.join("cache.sqlite3")).await?;
pragmas(&db).await?;
Ok(SqliteCache { db })
} }
} }
impl std::error::Error for RkvErrWrap {}
impl PreprocCache for LmdbCache { #[async_trait::async_trait]
fn get(&self, db_name: &str, key: &[u8]) -> Result<Option<Vec<u8>>> { impl PreprocCache for SqliteCache {
let start = Instant::now(); async fn get(&self, key: &CacheKey) -> Result<Option<Vec<u8>>> {
let db_env = self let key = (*key).clone(); // todo: without cloning
.db_arc Ok(self
.read() .db
.map_err(|_| anyhow::anyhow!("Could not open lock, some lock writer panicked"))?; .call(move |db| {
let db = db_env db.query_row(
.open_single(db_name, rkv::store::Options::create()) "select text_content_zstd from preproc_cache where
.map_err(RkvErrWrap) adapter = :adapter
.context("could not open cache db store")?; and adapter_version = :adapter_version
and active_adapters = :active_adapters
let reader = db_env.read().expect("could not get reader"); and file_path = :file_path
let cached = db and file_mtime_unix_ms = :file_mtime_unix_ms
.get(&reader, key) ",
.map_err(RkvErrWrap) named_params! {
.context("could not read from db")?; ":adapter": &key.adapter,
":adapter_version": &key.adapter_version,
match cached { ":active_adapters": &key.active_adapters,
Some(rkv::Value::Blob(cached)) => { ":file_path": &key.file_path,
debug!( ":file_mtime_unix_ms": &key.file_mtime_unix_ms
"cache HIT, reading {} (compressed) from cache", },
print_bytes(cached.len() as f64) |r| r.get::<_, Vec<u8>>(0),
); )
debug!("reading from cache took {}", print_dur(start)); .optional()
Ok(Some(Vec::from(cached))) })
} .await
Some(_) => Err(format_err!("Integrity: value not blob"))?, .context("reading from cache")?)
None => Ok(None),
}
} }
fn set(&mut self, db_name: &str, key: &[u8], got: &[u8]) -> Result<()> {
let start = Instant::now();
debug!("writing {} to cache", print_bytes(got.len() as f64));
let db_env = self
.db_arc
.read()
.map_err(|_| anyhow::anyhow!("Could not open lock, some lock writer panicked"))?;
let db = db_env async fn set(&mut self, key: &CacheKey, value: Vec<u8>) -> Result<()> {
.open_single(db_name, rkv::store::Options::create()) let key = (*key).clone(); // todo: without cloning
.map_err(RkvErrWrap) Ok(self
.context("could not open cache db store")?; .db
.call(move |db| {
db.execute(
"insert into preproc_cache (adapter, adapter_version, active_adapters, file_path, file_mtime_unix_ms, text_content_zstd) values
(:adapter, :adapter_version, :active_adapters, :file_path, :file_mtime_unix_ms, :text_content_zstd)
on conflict (adapter, adapter_version, active_adapters, file_path) do update set
file_mtime_unix_ms = :file_mtime_unix_ms,
created_unix_ms = unixepoch() * 1000,
text_content_zstd = :text_content_zstd",
named_params! {
":adapter": &key.adapter,
":adapter_version": &key.adapter_version,
":active_adapters": &key.active_adapters,
":file_path": &key.file_path,
":file_mtime_unix_ms": &key.file_mtime_unix_ms,
":text_content_zstd": value
}
).map(|_| ())
})
.await?)
}
}
/// opens a default cache
pub async fn open_cache_db(path: &Path) -> Result<impl PreprocCache> {
std::fs::create_dir_all(path)?;
SqliteCache::new(path).await
}
let mut writer = db_env #[cfg(test)]
.write() mod test {
.map_err(RkvErrWrap)
.with_context(|| format_err!("could not open write handle to cache"))?;
db.put(&mut writer, key, &rkv::Value::Blob(got)) use crate::preproc_cache::*;
.map_err(RkvErrWrap)
.with_context(|| format_err!("could not write to cache"))?; #[tokio::test]
writer async fn test_read_write() -> anyhow::Result<()> {
.commit() let path = tempfile::tempdir()?;
.map_err(RkvErrWrap) let db = open_cache_db(&path.path().join("foo.sqlite3")).await?;
.with_context(|| "could not write cache".to_string())?; // db.set();
debug!("writing to cache took {}", print_dur(start));
Ok(()) Ok(())
} }
} }