2020-06-06 10:57:43 +00:00
|
|
|
use anyhow::Result;
|
2019-06-07 21:17:33 +00:00
|
|
|
use log::*;
|
2019-06-06 21:43:30 +00:00
|
|
|
use std::io::Write;
|
2019-06-05 14:43:40 +00:00
|
|
|
|
|
|
|
/**
|
|
|
|
* wrap a writer so that it is passthrough,
|
2019-06-07 17:00:24 +00:00
|
|
|
* but also the written data is compressed and written into a buffer,
|
|
|
|
* unless more than max_cache_size bytes is written, then the cache is dropped and it is pure passthrough.
|
2019-06-05 14:43:40 +00:00
|
|
|
*/
|
|
|
|
pub struct CachingWriter<W: Write> {
|
|
|
|
max_cache_size: usize,
|
|
|
|
zstd_writer: Option<zstd::stream::write::Encoder<Vec<u8>>>,
|
|
|
|
out: W,
|
2020-06-09 10:47:34 +00:00
|
|
|
bytes_written: u64,
|
2019-06-05 14:43:40 +00:00
|
|
|
}
|
|
|
|
impl<W: Write> CachingWriter<W> {
|
2020-06-06 10:57:43 +00:00
|
|
|
pub fn new(out: W, max_cache_size: usize, compression_level: i32) -> Result<CachingWriter<W>> {
|
2019-06-05 14:43:40 +00:00
|
|
|
Ok(CachingWriter {
|
|
|
|
out,
|
|
|
|
max_cache_size,
|
|
|
|
zstd_writer: Some(zstd::stream::write::Encoder::new(
|
|
|
|
Vec::new(),
|
|
|
|
compression_level,
|
|
|
|
)?),
|
2020-06-09 10:47:34 +00:00
|
|
|
bytes_written: 0,
|
2019-06-05 14:43:40 +00:00
|
|
|
})
|
|
|
|
}
|
2020-06-09 10:47:34 +00:00
|
|
|
pub fn finish(self) -> std::io::Result<(u64, Option<Vec<u8>>)> {
|
2019-06-05 14:43:40 +00:00
|
|
|
if let Some(writer) = self.zstd_writer {
|
|
|
|
let res = writer.finish()?;
|
|
|
|
if res.len() <= self.max_cache_size {
|
2020-06-09 10:47:34 +00:00
|
|
|
return Ok((self.bytes_written, Some(res)));
|
2019-06-05 14:43:40 +00:00
|
|
|
}
|
|
|
|
}
|
2020-06-09 10:47:34 +00:00
|
|
|
Ok((self.bytes_written, None))
|
2019-06-05 14:43:40 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
impl<W: Write> Write for CachingWriter<W> {
|
|
|
|
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
2020-06-09 10:47:34 +00:00
|
|
|
let written_bytes = match self.zstd_writer.as_mut() {
|
2019-06-05 14:43:40 +00:00
|
|
|
Some(writer) => {
|
|
|
|
let wrote = writer.write(buf)?;
|
|
|
|
let compressed_len = writer.get_ref().len();
|
2019-06-11 11:34:04 +00:00
|
|
|
trace!("wrote {} to zstd, len now {}", wrote, compressed_len);
|
2019-06-05 14:43:40 +00:00
|
|
|
if compressed_len > self.max_cache_size {
|
2019-06-18 10:14:09 +00:00
|
|
|
debug!("cache longer than max, dropping");
|
2019-06-05 14:43:40 +00:00
|
|
|
//writer.finish();
|
|
|
|
self.zstd_writer.take().unwrap().finish()?;
|
|
|
|
}
|
|
|
|
self.out.write_all(&buf[0..wrote])?;
|
2019-06-06 21:43:30 +00:00
|
|
|
Ok(wrote)
|
2019-06-05 14:43:40 +00:00
|
|
|
}
|
|
|
|
None => self.out.write(buf),
|
2020-06-09 10:47:34 +00:00
|
|
|
}?;
|
|
|
|
self.bytes_written += written_bytes as u64;
|
|
|
|
Ok(written_bytes)
|
2019-06-05 14:43:40 +00:00
|
|
|
}
|
|
|
|
fn flush(&mut self) -> std::io::Result<()> {
|
2019-06-18 10:14:09 +00:00
|
|
|
debug!("flushing");
|
2019-06-05 14:43:40 +00:00
|
|
|
if let Some(writer) = self.zstd_writer.as_mut() {
|
|
|
|
writer.flush()?;
|
|
|
|
}
|
|
|
|
self.out.flush()
|
|
|
|
}
|
|
|
|
}
|