Wait on all spawned tasks

This commit is contained in:
FliegendeWurst 2020-11-28 13:12:42 +01:00
parent 377cef7cf2
commit 09f69b9efd

View File

@ -1,5 +1,6 @@
use anyhow::{Context, Result, anyhow};
use futures_util::stream::TryStreamExt;
use futures_channel::mpsc::UnboundedSender;
use futures_util::{StreamExt, stream::TryStreamExt};
use ignore::gitignore::Gitignore;
use lazy_static::lazy_static;
use parking_lot::Mutex;
@ -10,7 +11,7 @@ use serde_json::json;
use structopt::StructOpt;
use tokio::fs;
use tokio::io::stream_reader;
use tokio::task;
use tokio::task::{self, JoinHandle};
use url::Url;
use std::default::Default;
@ -35,7 +36,6 @@ async fn main() {
*PANIC_HOOK.lock() = panic::take_hook();
panic::set_hook(Box::new(|info| {
*TASKS_RUNNING.lock() -= 1;
*TASKS_QUEUED.lock() -= 1;
PANIC_HOOK.lock()(info);
}));
@ -81,21 +81,21 @@ async fn main() {
}
let ilias = Arc::new(ilias);
let desktop = ilias.personal_desktop().await.context("Failed to load personal desktop");
let (tx, mut rx) = futures_channel::mpsc::unbounded::<JoinHandle<()>>();
*TASKS.lock() = Some(tx.clone());
match desktop {
Ok(desktop) => {
for item in desktop.items {
let mut path = ilias.opt.output.clone();
path.push(file_escape(item.name()));
let ilias = Arc::clone(&ilias);
task::spawn(process_gracefully(ilias, path, item));
let _ = tx.unbounded_send(task::spawn(process_gracefully(ilias, path, item)));
}
},
Err(e) => println!("{:?}", e)
}
// TODO: do this with tokio
// https://github.com/tokio-rs/tokio/issues/2039
while *TASKS_QUEUED.lock() > 0 {
tokio::time::delay_for(Duration::from_millis(500)).await;
while let Some(task) = rx.next().await {
let _ = task.await;
}
if ilias.opt.content_tree {
// restore fast page loading times
@ -106,12 +106,18 @@ async fn main() {
}
lazy_static!{
static ref TASKS_QUEUED: Mutex<usize> = Mutex::default();
static ref TASKS: Mutex<Option<UnboundedSender<JoinHandle<()>>>> = Mutex::default();
static ref TASKS_RUNNING: Mutex<usize> = Mutex::default();
static ref PANIC_HOOK: Mutex<Box<dyn Fn(&panic::PanicInfo) + Sync + Send + 'static>> = Mutex::new(Box::new(|_| {}));
}
macro_rules! spawn {
($e:expr) => {
let _ = TASKS.lock().as_ref().unwrap().unbounded_send(task::spawn($e));
}
}
fn ask_user_pass() -> (String, String) {
let user = rprompt::prompt_reply_stdout("Username: ").expect("username prompt");
let pass = rpassword::read_password_from_tty(Some("Password: ")).expect("password prompt");
@ -119,7 +125,6 @@ fn ask_user_pass() -> (String, String) {
}
fn process_gracefully(ilias: Arc<ILIAS>, path: PathBuf, obj: Object) -> impl Future<Output = ()> + Send { async move {
*TASKS_QUEUED.lock() += 1;
loop {
{ // limit scope of lock
let mut running = TASKS_RUNNING.lock();
@ -135,7 +140,6 @@ fn process_gracefully(ilias: Arc<ILIAS>, path: PathBuf, obj: Object) -> impl Fut
println!("Syncing {}: {:?}", path_text, e);
}
*TASKS_RUNNING.lock() -= 1;
*TASKS_QUEUED.lock() -= 1;
}}
async fn handle_gracefully(fut: impl Future<Output = Result<()>>) {
@ -220,7 +224,7 @@ fn process(ilias: Arc<ILIAS>, mut path: PathBuf, obj: Object) -> impl Future<Out
let mut path = path.clone();
path.push(file_escape(item.name()));
let ilias = Arc::clone(&ilias);
task::spawn(process_gracefully(ilias, path, item));
spawn!(process_gracefully(ilias, path, item));
}
},
Folder { url, .. } => {
@ -235,7 +239,7 @@ fn process(ilias: Arc<ILIAS>, mut path: PathBuf, obj: Object) -> impl Future<Out
let mut path = path.clone();
path.push(file_escape(item.name()));
let ilias = Arc::clone(&ilias);
task::spawn(process_gracefully(ilias, path, item));
spawn!(process_gracefully(ilias, path, item));
}
},
File { url, .. } => {
@ -307,7 +311,7 @@ fn process(ilias: Arc<ILIAS>, mut path: PathBuf, obj: Object) -> impl Future<Out
url: URL::raw(link.value().attr("href").ok_or(anyhow!("video link without href"))?.to_owned())
};
let ilias = Arc::clone(&ilias);
task::spawn(async {
spawn!(async {
process_gracefully(ilias, path, video).await;
});
}
@ -407,7 +411,7 @@ fn process(ilias: Arc<ILIAS>, mut path: PathBuf, obj: Object) -> impl Future<Out
}
log!(0, "New posts in {:?}..", path);
let ilias = Arc::clone(&ilias);
task::spawn(process_gracefully(ilias, path, object));
spawn!(process_gracefully(ilias, path, object));
}
if html.select(&forum_pages).count() > 0 {
log!(0, "Ignoring older threads in {:?}..", path);
@ -431,7 +435,7 @@ fn process(ilias: Arc<ILIAS>, mut path: PathBuf, obj: Object) -> impl Future<Out
let data = data.inner_html();
let mut path = path.clone();
path.push(file_escape(&name));
task::spawn(handle_gracefully(async move {
spawn!(handle_gracefully(async move {
write_file_data(&path, &mut data.as_bytes()).await
.context("failed to write forum post")
}));
@ -446,7 +450,7 @@ fn process(ilias: Arc<ILIAS>, mut path: PathBuf, obj: Object) -> impl Future<Out
let next_page = Thread {
url: URL::from_href(last.value().attr("href").ok_or(anyhow!("page link not found"))?)?
};
task::spawn(process_gracefully(ilias, path, next_page));
spawn!(process_gracefully(ilias, path, next_page));
}
} else {
log!(0, "Warning: unable to find pagination links in {}", url.url);
@ -478,7 +482,7 @@ fn process(ilias: Arc<ILIAS>, mut path: PathBuf, obj: Object) -> impl Future<Out
let mut path = path.clone();
path.push(file_escape(item.name()));
let ilias = Arc::clone(&ilias);
task::spawn(process_gracefully(ilias, path, item));
spawn!(process_gracefully(ilias, path, item));
}
},
Weblink { url, .. } => {