diff --git a/src/main.rs b/src/main.rs index 9d103ee..6e2dbd7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,18 +12,16 @@ use reqwest::{Client, Proxy}; use scraper::{ElementRef, Html, Selector}; use serde_json::json; use structopt::StructOpt; -use tokio::fs; +use tokio::{fs, sync::Semaphore}; use tokio::task::{self, JoinHandle}; use tokio_util::io::StreamReader; use url::Url; use std::future::Future; use std::io; -use std::panic; use std::path::PathBuf; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::time::Duration; use std::{collections::HashSet, default::Default, sync::atomic::AtomicUsize}; mod util; @@ -82,18 +80,11 @@ macro_rules! error { async fn main() { let mut opt = Opt::from_args(); #[cfg(windows)] - colored::control::set_virtual_terminal(true); + let _ = colored::control::set_virtual_terminal(true); // use UNC paths on Windows opt.output = fs::canonicalize(opt.output).await.expect("failed to canonicalize directory"); LOG_LEVEL.store(opt.verbose, Ordering::SeqCst); create_dir(&opt.output).await.expect("failed to create output directory"); - // need this because task scheduling is WIP - // (would wait forever on paniced task) - *PANIC_HOOK.lock() = panic::take_hook(); - panic::set_hook(Box::new(|info| { - *TASKS_RUNNING.lock() -= 1; - PANIC_HOOK.lock()(info); - })); // load .iliasignore file opt.output.push(".iliasignore"); @@ -133,6 +124,7 @@ async fn main() { let ilias = Arc::new(ilias); let (tx, mut rx) = futures_channel::mpsc::unbounded::>(); *TASKS.lock() = Some(tx.clone()); + TASKS_RUNNING.add_permits(ilias.opt.jobs); if let Some(url) = ilias.opt.sync_url.as_ref() { for item in ilias.get_course_content(&URL::from_href(url).expect("invalid URL")).await.expect("invalid response") { let item = item.expect("invalid item"); @@ -173,13 +165,12 @@ async fn main() { lazy_static! { static ref TASKS: Mutex>>> = Mutex::default(); - static ref TASKS_RUNNING: Mutex = Mutex::default(); - static ref PANIC_HOOK: Mutex> = Mutex::new(Box::new(|_| {})); + static ref TASKS_RUNNING: Semaphore = Semaphore::new(0); } macro_rules! spawn { ($e:expr) => { - let _ = TASKS.lock().as_ref().unwrap().unbounded_send(task::spawn($e)); + TASKS.lock().as_ref().unwrap().unbounded_send(task::spawn($e)).unwrap(); }; } @@ -234,22 +225,12 @@ fn process_gracefully( path: PathBuf, obj: Object, ) -> impl Future + Send { async move { - loop { - { - // limit scope of lock - let mut running = TASKS_RUNNING.lock(); - if *running < ilias.opt.jobs { - *running += 1; - break; - } - } - tokio::time::sleep(Duration::from_millis(50)).await; - } + let permit = TASKS_RUNNING.acquire().await.unwrap(); let path_text = path.to_string_lossy().into_owned(); if let Err(e) = process(ilias, path, obj).await.context("failed to process URL") { error!("Syncing {}", path_text; e); } - *TASKS_RUNNING.lock() -= 1; + drop(permit); }} async fn handle_gracefully(fut: impl Future>) {