Use async semaphore to limit parallel jobs

This commit is contained in:
FliegendeWurst 2021-04-19 12:32:35 +02:00
parent dd37cc9d91
commit bcc16f1ec5

View File

@ -12,18 +12,16 @@ use reqwest::{Client, Proxy};
use scraper::{ElementRef, Html, Selector}; use scraper::{ElementRef, Html, Selector};
use serde_json::json; use serde_json::json;
use structopt::StructOpt; use structopt::StructOpt;
use tokio::fs; use tokio::{fs, sync::Semaphore};
use tokio::task::{self, JoinHandle}; use tokio::task::{self, JoinHandle};
use tokio_util::io::StreamReader; use tokio_util::io::StreamReader;
use url::Url; use url::Url;
use std::future::Future; use std::future::Future;
use std::io; use std::io;
use std::panic;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use std::{collections::HashSet, default::Default, sync::atomic::AtomicUsize}; use std::{collections::HashSet, default::Default, sync::atomic::AtomicUsize};
mod util; mod util;
@ -82,18 +80,11 @@ macro_rules! error {
async fn main() { async fn main() {
let mut opt = Opt::from_args(); let mut opt = Opt::from_args();
#[cfg(windows)] #[cfg(windows)]
colored::control::set_virtual_terminal(true); let _ = colored::control::set_virtual_terminal(true);
// use UNC paths on Windows // use UNC paths on Windows
opt.output = fs::canonicalize(opt.output).await.expect("failed to canonicalize directory"); opt.output = fs::canonicalize(opt.output).await.expect("failed to canonicalize directory");
LOG_LEVEL.store(opt.verbose, Ordering::SeqCst); LOG_LEVEL.store(opt.verbose, Ordering::SeqCst);
create_dir(&opt.output).await.expect("failed to create output directory"); create_dir(&opt.output).await.expect("failed to create output directory");
// need this because task scheduling is WIP
// (would wait forever on paniced task)
*PANIC_HOOK.lock() = panic::take_hook();
panic::set_hook(Box::new(|info| {
*TASKS_RUNNING.lock() -= 1;
PANIC_HOOK.lock()(info);
}));
// load .iliasignore file // load .iliasignore file
opt.output.push(".iliasignore"); opt.output.push(".iliasignore");
@ -133,6 +124,7 @@ async fn main() {
let ilias = Arc::new(ilias); let ilias = Arc::new(ilias);
let (tx, mut rx) = futures_channel::mpsc::unbounded::<JoinHandle<()>>(); let (tx, mut rx) = futures_channel::mpsc::unbounded::<JoinHandle<()>>();
*TASKS.lock() = Some(tx.clone()); *TASKS.lock() = Some(tx.clone());
TASKS_RUNNING.add_permits(ilias.opt.jobs);
if let Some(url) = ilias.opt.sync_url.as_ref() { if let Some(url) = ilias.opt.sync_url.as_ref() {
for item in ilias.get_course_content(&URL::from_href(url).expect("invalid URL")).await.expect("invalid response") { for item in ilias.get_course_content(&URL::from_href(url).expect("invalid URL")).await.expect("invalid response") {
let item = item.expect("invalid item"); let item = item.expect("invalid item");
@ -173,13 +165,12 @@ async fn main() {
lazy_static! { lazy_static! {
static ref TASKS: Mutex<Option<UnboundedSender<JoinHandle<()>>>> = Mutex::default(); static ref TASKS: Mutex<Option<UnboundedSender<JoinHandle<()>>>> = Mutex::default();
static ref TASKS_RUNNING: Mutex<usize> = Mutex::default(); static ref TASKS_RUNNING: Semaphore = Semaphore::new(0);
static ref PANIC_HOOK: Mutex<Box<dyn Fn(&panic::PanicInfo) + Sync + Send + 'static>> = Mutex::new(Box::new(|_| {}));
} }
macro_rules! spawn { macro_rules! spawn {
($e:expr) => { ($e:expr) => {
let _ = TASKS.lock().as_ref().unwrap().unbounded_send(task::spawn($e)); TASKS.lock().as_ref().unwrap().unbounded_send(task::spawn($e)).unwrap();
}; };
} }
@ -234,22 +225,12 @@ fn process_gracefully(
path: PathBuf, path: PathBuf,
obj: Object, obj: Object,
) -> impl Future<Output = ()> + Send { async move { ) -> impl Future<Output = ()> + Send { async move {
loop { let permit = TASKS_RUNNING.acquire().await.unwrap();
{
// limit scope of lock
let mut running = TASKS_RUNNING.lock();
if *running < ilias.opt.jobs {
*running += 1;
break;
}
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
let path_text = path.to_string_lossy().into_owned(); let path_text = path.to_string_lossy().into_owned();
if let Err(e) = process(ilias, path, obj).await.context("failed to process URL") { if let Err(e) = process(ilias, path, obj).await.context("failed to process URL") {
error!("Syncing {}", path_text; e); error!("Syncing {}", path_text; e);
} }
*TASKS_RUNNING.lock() -= 1; drop(permit);
}} }}
async fn handle_gracefully(fut: impl Future<Output = Result<()>>) { async fn handle_gracefully(fut: impl Future<Output = Result<()>>) {