diff --git a/src/main.rs b/src/main.rs index a670979..811cc6e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,19 +8,22 @@ use reqwest::Client; use scraper::{ElementRef, Html, Selector}; use serde_json::json; use structopt::StructOpt; -use tokio::fs::File as AsyncFile; -use tokio::io::{stream_reader, BufWriter}; +use tokio::io::stream_reader; use tokio::task; use url::Url; use std::default::Default; use std::fs; +use std::future::Future; use std::io; use std::panic; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; +mod util; +use util::*; + const ILIAS_URL: &'static str = "https://ilias.studium.kit.edu/"; #[tokio::main] @@ -85,20 +88,26 @@ lazy_static!{ static ref PANIC_HOOK: Mutex> = Mutex::new(Box::new(|_| {})); } -fn process_gracefully(ilias: Arc, path: PathBuf, obj: Object) -> impl std::future::Future + Send { async move { +fn process_gracefully(ilias: Arc, path: PathBuf, obj: Object) -> impl Future + Send { async move { *TASKS_QUEUED.lock() += 1; while *TASKS_RUNNING.lock() >= ilias.opt.jobs { tokio::time::delay_for(Duration::from_millis(100)).await; } *TASKS_RUNNING.lock() += 1; - let path_text = format!("{:?}", path); - if let Err(e) = process(ilias, path, obj).await.context("Failed to process URL") { + let path_text = path.to_string_lossy().into_owned(); + if let Err(e) = process(ilias, path, obj).await.context("failed to process URL") { println!("Syncing {}: {:?}", path_text, e); } *TASKS_RUNNING.lock() -= 1; *TASKS_QUEUED.lock() -= 1; }} +async fn handle_gracefully(fut: impl Future>) { + if let Err(e) = fut.await { + println!("Error: {:?}", e); + } +} + #[allow(non_upper_case_globals)] mod selectors { use lazy_static::lazy_static; @@ -131,7 +140,7 @@ use crate::selectors::*; // see https://github.com/rust-lang/rust/issues/53690#issuecomment-418911229 //async fn process(ilias: Arc, path: PathBuf, obj: Object) { -fn process(ilias: Arc, mut path: PathBuf, obj: Object) -> impl std::future::Future> + Send { async move { +fn process(ilias: Arc, mut path: PathBuf, obj: Object) -> impl Future> + Send { async move { let log_level = ilias.opt.verbose; macro_rules! log { ($lvl:expr, $($arg:expr),*) => { @@ -218,9 +227,7 @@ fn process(ilias: Arc, mut path: PathBuf, obj: Object) -> impl std::futur io::Error::new(io::ErrorKind::Other, x) })); log!(0, "Writing {}", relative_path.to_string_lossy()); - let file = AsyncFile::create(&path).await?; - let mut file = BufWriter::new(file); - tokio::io::copy(&mut reader, &mut file).await?; + write_file_data(&path, &mut reader).await.context("failed to save file")?; }, PluginDispatch { url, .. } => { if ilias.opt.no_videos { @@ -303,9 +310,7 @@ fn process(ilias: Arc, mut path: PathBuf, obj: Object) -> impl std::futur io::Error::new(io::ErrorKind::Other, x) })); log!(0, "Writing {}", relative_path.to_string_lossy()); - let file = AsyncFile::create(&path).await?; - let mut file = BufWriter::new(file); - tokio::io::copy(&mut reader, &mut file).await?; + write_file_data(&path, &mut reader).await.context("failed to save video")?; }, Forum { url, .. } => { if !ilias.opt.forum { @@ -391,25 +396,11 @@ fn process(ilias: Arc, mut path: PathBuf, obj: Object) -> impl std::futur let data = data.inner_html(); let mut path = path.clone(); path.push(name); - let ilias = Arc::clone(&ilias); task::spawn(async move { - *TASKS_QUEUED.lock() += 1; - while *TASKS_RUNNING.lock() >= ilias.opt.jobs { - tokio::time::delay_for(Duration::from_millis(100)).await; - } - *TASKS_RUNNING.lock() += 1; - log!(2, "Writing to {:?}..", path); - let file = AsyncFile::create(&path).await; - if file.is_err() { - log!(0, "Error creating file {:?}: {}", path, file.err().unwrap()); - return; - } - let mut file = BufWriter::new(file.unwrap()); - if let Err(e) = tokio::io::copy(&mut data.as_bytes(), &mut file).await { - log!(0, "Error writing to {:?}: {}", path, e); - } - *TASKS_RUNNING.lock() -= 1; - *TASKS_QUEUED.lock() -= 1; + handle_gracefully(async move { + write_file_data(&path, &mut data.as_bytes()).await + .context("failed to write forum post") + }).await; }); } // pagination @@ -503,16 +494,12 @@ fn process(ilias: Arc, mut path: PathBuf, obj: Object) -> impl std::futur let head = head.unwrap(); let url = head.url().as_str(); path.push(name); - let file = AsyncFile::create(&path).await?; - let mut file = BufWriter::new(file); - tokio::io::copy(&mut url.as_bytes(), &mut file).await?; + write_file_data(&path, &mut url.as_bytes()).await?; path.pop(); } } else { log!(0, "Writing {}", relative_path.to_string_lossy()); - let file = AsyncFile::create(&path).await?; - let mut file = BufWriter::new(file); - tokio::io::copy(&mut url.as_bytes(), &mut file).await?; + write_file_data(&path, &mut url.as_bytes()).await.context("failed to save weblink")?; } }, Wiki { .. } => { diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..dd8f758 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,15 @@ +use anyhow::Context; +use tokio::fs::File as AsyncFile; +use tokio::io::{AsyncRead, BufWriter}; + +use std::path::Path; + +use super::Result; + +pub async fn write_file_data(path: &Path, data: &mut R) -> Result<()> +where R: AsyncRead + Unpin { + let file = AsyncFile::create(&path).await.context("failed to create file")?; + let mut file = BufWriter::new(file); + tokio::io::copy(data, &mut file).await.context("failed to write to file")?; + Ok(()) +} \ No newline at end of file