Wait for queued network jobs

This commit is contained in:
FliegendeWurst 2020-04-22 12:59:20 +02:00
parent dc55666255
commit af0743184f
3 changed files with 24 additions and 10 deletions

6
Cargo.lock generated
View File

@ -2,7 +2,7 @@
# It is not intended for manual editing. # It is not intended for manual editing.
[[package]] [[package]]
name = "KIT-ILIAS-downloader" name = "KIT-ILIAS-downloader"
version = "0.2.0" version = "0.2.1"
dependencies = [ dependencies = [
"error-chain", "error-chain",
"futures-util", "futures-util",
@ -823,9 +823,9 @@ dependencies = [
[[package]] [[package]]
name = "parking_lot_core" name = "parking_lot_core"
version = "0.7.1" version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e136c1904604defe99ce5fd71a28d473fa60a12255d511aa78a9ddf11237aeb" checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"cloudabi", "cloudabi",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "KIT-ILIAS-downloader" name = "KIT-ILIAS-downloader"
version = "0.2.0" version = "0.2.1"
authors = ["FliegendeWurst <2012gdwu@web.de>"] authors = ["FliegendeWurst <2012gdwu@web.de>"]
license = "GPL-2.0" license = "GPL-2.0"
edition = "2018" edition = "2018"

View File

@ -320,6 +320,7 @@ async fn main() {
*PANIC_HOOK.lock() = panic::take_hook(); *PANIC_HOOK.lock() = panic::take_hook();
panic::set_hook(Box::new(|info| { panic::set_hook(Box::new(|info| {
*TASKS_RUNNING.lock() -= 1; *TASKS_RUNNING.lock() -= 1;
*TASKS_QUEUED.lock() -= 1;
PANIC_HOOK.lock()(info); PANIC_HOOK.lock()(info);
})); }));
let user = rprompt::prompt_reply_stdout("Username: ").unwrap(); let user = rprompt::prompt_reply_stdout("Username: ").unwrap();
@ -339,12 +340,14 @@ async fn main() {
while let Some((path, obj)) = queue.pop_front() { while let Some((path, obj)) = queue.pop_front() {
let ilias = Arc::clone(&ilias); let ilias = Arc::clone(&ilias);
task::spawn(async { task::spawn(async {
while *TASKS_RUNNING.lock() > ilias.opt.jobs { *TASKS_QUEUED.lock() += 1;
while *TASKS_RUNNING.lock() >= ilias.opt.jobs {
tokio::time::delay_for(Duration::from_millis(100)).await; tokio::time::delay_for(Duration::from_millis(100)).await;
} }
*TASKS_RUNNING.lock() += 1; *TASKS_RUNNING.lock() += 1;
process(ilias, path, obj).await; process(ilias, path, obj).await;
*TASKS_RUNNING.lock() -= 1; *TASKS_RUNNING.lock() -= 1;
*TASKS_QUEUED.lock() -= 1;
}); });
} }
while *TASKS_RUNNING.lock() > 0 { while *TASKS_RUNNING.lock() > 0 {
@ -353,6 +356,7 @@ async fn main() {
} }
lazy_static!{ lazy_static!{
static ref TASKS_QUEUED: Mutex<usize> = Mutex::default();
static ref TASKS_RUNNING: Mutex<usize> = Mutex::default(); static ref TASKS_RUNNING: Mutex<usize> = Mutex::default();
static ref PANIC_HOOK: Mutex<Box<dyn Fn(&panic::PanicInfo) + Sync + Send + 'static>> = Mutex::new(Box::new(|_| {})); static ref PANIC_HOOK: Mutex<Box<dyn Fn(&panic::PanicInfo) + Sync + Send + 'static>> = Mutex::new(Box::new(|_| {}));
@ -377,12 +381,14 @@ fn process(ilias: Arc<ILIAS>, path: PathBuf, obj: Object) -> impl std::future::F
path.push(item.name()); path.push(item.name());
let ilias = Arc::clone(&ilias); let ilias = Arc::clone(&ilias);
task::spawn(async { task::spawn(async {
while *TASKS_RUNNING.lock() > ilias.opt.jobs { *TASKS_QUEUED.lock() += 1;
while *TASKS_RUNNING.lock() >= ilias.opt.jobs {
tokio::time::delay_for(Duration::from_millis(100)).await; tokio::time::delay_for(Duration::from_millis(100)).await;
} }
*TASKS_RUNNING.lock() += 1; *TASKS_RUNNING.lock() += 1;
process(ilias, path, item).await; process(ilias, path, item).await;
*TASKS_RUNNING.lock() -= 1; *TASKS_RUNNING.lock() -= 1;
*TASKS_QUEUED.lock() -= 1;
}); });
} }
}, },
@ -398,12 +404,14 @@ fn process(ilias: Arc<ILIAS>, path: PathBuf, obj: Object) -> impl std::future::F
path.push(item.name()); path.push(item.name());
let ilias = Arc::clone(&ilias); let ilias = Arc::clone(&ilias);
task::spawn(async { task::spawn(async {
while *TASKS_RUNNING.lock() > ilias.opt.jobs { *TASKS_QUEUED.lock() += 1;
while *TASKS_RUNNING.lock() >= ilias.opt.jobs {
tokio::time::delay_for(Duration::from_millis(100)).await; tokio::time::delay_for(Duration::from_millis(100)).await;
} }
*TASKS_RUNNING.lock() += 1; *TASKS_RUNNING.lock() += 1;
process(ilias, path, item).await; process(ilias, path, item).await;
*TASKS_RUNNING.lock() -= 1; *TASKS_RUNNING.lock() -= 1;
*TASKS_QUEUED.lock() -= 1;
}); });
} }
}, },
@ -469,12 +477,14 @@ fn process(ilias: Arc<ILIAS>, path: PathBuf, obj: Object) -> impl std::future::F
}; };
let ilias = Arc::clone(&ilias); let ilias = Arc::clone(&ilias);
task::spawn(async { task::spawn(async {
while *TASKS_RUNNING.lock() > ilias.opt.jobs { *TASKS_QUEUED.lock() += 1;
while *TASKS_RUNNING.lock() >= ilias.opt.jobs {
tokio::time::delay_for(Duration::from_millis(100)).await; tokio::time::delay_for(Duration::from_millis(100)).await;
} }
*TASKS_RUNNING.lock() += 1; *TASKS_RUNNING.lock() += 1;
process(ilias, path, video).await; process(ilias, path, video).await;
*TASKS_RUNNING.lock() -= 1; *TASKS_RUNNING.lock() -= 1;
*TASKS_QUEUED.lock() -= 1;
}); });
} }
@ -563,12 +573,14 @@ fn process(ilias: Arc<ILIAS>, path: PathBuf, obj: Object) -> impl std::future::F
path.push(name); path.push(name);
let ilias = Arc::clone(&ilias); let ilias = Arc::clone(&ilias);
task::spawn(async { task::spawn(async {
while *TASKS_RUNNING.lock() > ilias.opt.jobs { *TASKS_QUEUED.lock() += 1;
while *TASKS_RUNNING.lock() >= ilias.opt.jobs {
tokio::time::delay_for(Duration::from_millis(100)).await; tokio::time::delay_for(Duration::from_millis(100)).await;
} }
*TASKS_RUNNING.lock() += 1; *TASKS_RUNNING.lock() += 1;
process(ilias, path, object).await; process(ilias, path, object).await;
*TASKS_RUNNING.lock() -= 1; *TASKS_RUNNING.lock() -= 1;
*TASKS_QUEUED.lock() -= 1;
}); });
} }
}, },
@ -610,7 +622,8 @@ fn process(ilias: Arc<ILIAS>, path: PathBuf, obj: Object) -> impl std::future::F
path.push(name); path.push(name);
let ilias = Arc::clone(&ilias); let ilias = Arc::clone(&ilias);
task::spawn(async move { task::spawn(async move {
while *TASKS_RUNNING.lock() > ilias.opt.jobs { *TASKS_QUEUED.lock() += 1;
while *TASKS_RUNNING.lock() >= ilias.opt.jobs {
tokio::time::delay_for(Duration::from_millis(100)).await; tokio::time::delay_for(Duration::from_millis(100)).await;
} }
*TASKS_RUNNING.lock() += 1; *TASKS_RUNNING.lock() += 1;
@ -621,6 +634,7 @@ fn process(ilias: Arc<ILIAS>, path: PathBuf, obj: Object) -> impl std::future::F
let mut file = BufWriter::new(file); let mut file = BufWriter::new(file);
tokio::io::copy(&mut data.as_bytes(), &mut file).await.unwrap(); tokio::io::copy(&mut data.as_bytes(), &mut file).await.unwrap();
*TASKS_RUNNING.lock() -= 1; *TASKS_RUNNING.lock() -= 1;
*TASKS_QUEUED.lock() -= 1;
}); });
} }
}, },