diff --git a/src/main.rs b/src/main.rs index e33bbd3..e5eef11 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ use anyhow::{Context, Result, anyhow}; -use futures_util::stream::TryStreamExt; +use futures_channel::mpsc::UnboundedSender; +use futures_util::{StreamExt, stream::TryStreamExt}; use ignore::gitignore::Gitignore; use lazy_static::lazy_static; use parking_lot::Mutex; @@ -10,7 +11,7 @@ use serde_json::json; use structopt::StructOpt; use tokio::fs; use tokio::io::stream_reader; -use tokio::task; +use tokio::task::{self, JoinHandle}; use url::Url; use std::default::Default; @@ -35,7 +36,6 @@ async fn main() { *PANIC_HOOK.lock() = panic::take_hook(); panic::set_hook(Box::new(|info| { *TASKS_RUNNING.lock() -= 1; - *TASKS_QUEUED.lock() -= 1; PANIC_HOOK.lock()(info); })); @@ -81,21 +81,21 @@ async fn main() { } let ilias = Arc::new(ilias); let desktop = ilias.personal_desktop().await.context("Failed to load personal desktop"); + let (tx, mut rx) = futures_channel::mpsc::unbounded::>(); + *TASKS.lock() = Some(tx.clone()); match desktop { Ok(desktop) => { for item in desktop.items { let mut path = ilias.opt.output.clone(); path.push(file_escape(item.name())); let ilias = Arc::clone(&ilias); - task::spawn(process_gracefully(ilias, path, item)); + let _ = tx.unbounded_send(task::spawn(process_gracefully(ilias, path, item))); } }, Err(e) => println!("{:?}", e) } - // TODO: do this with tokio - // https://github.com/tokio-rs/tokio/issues/2039 - while *TASKS_QUEUED.lock() > 0 { - tokio::time::delay_for(Duration::from_millis(500)).await; + while let Some(task) = rx.next().await { + let _ = task.await; } if ilias.opt.content_tree { // restore fast page loading times @@ -106,12 +106,18 @@ async fn main() { } lazy_static!{ - static ref TASKS_QUEUED: Mutex = Mutex::default(); + static ref TASKS: Mutex>>> = Mutex::default(); static ref TASKS_RUNNING: Mutex = Mutex::default(); static ref PANIC_HOOK: Mutex> = Mutex::new(Box::new(|_| {})); } +macro_rules! spawn { + ($e:expr) => { + let _ = TASKS.lock().as_ref().unwrap().unbounded_send(task::spawn($e)); + } +} + fn ask_user_pass() -> (String, String) { let user = rprompt::prompt_reply_stdout("Username: ").expect("username prompt"); let pass = rpassword::read_password_from_tty(Some("Password: ")).expect("password prompt"); @@ -119,7 +125,6 @@ fn ask_user_pass() -> (String, String) { } fn process_gracefully(ilias: Arc, path: PathBuf, obj: Object) -> impl Future + Send { async move { - *TASKS_QUEUED.lock() += 1; loop { { // limit scope of lock let mut running = TASKS_RUNNING.lock(); @@ -135,7 +140,6 @@ fn process_gracefully(ilias: Arc, path: PathBuf, obj: Object) -> impl Fut println!("Syncing {}: {:?}", path_text, e); } *TASKS_RUNNING.lock() -= 1; - *TASKS_QUEUED.lock() -= 1; }} async fn handle_gracefully(fut: impl Future>) { @@ -220,7 +224,7 @@ fn process(ilias: Arc, mut path: PathBuf, obj: Object) -> impl Future { @@ -235,7 +239,7 @@ fn process(ilias: Arc, mut path: PathBuf, obj: Object) -> impl Future { @@ -307,7 +311,7 @@ fn process(ilias: Arc, mut path: PathBuf, obj: Object) -> impl Future, mut path: PathBuf, obj: Object) -> impl Future 0 { log!(0, "Ignoring older threads in {:?}..", path); @@ -431,7 +435,7 @@ fn process(ilias: Arc, mut path: PathBuf, obj: Object) -> impl Future, mut path: PathBuf, obj: Object) -> impl Future, mut path: PathBuf, obj: Object) -> impl Future {