Request rate limiting

This commit is contained in:
FliegendeWurst 2021-05-07 09:15:24 +02:00
parent 81543b1d39
commit dc02ec7a46

View File

@ -8,11 +8,11 @@ use futures_util::{stream::TryStreamExt, StreamExt};
use ignore::gitignore::Gitignore; use ignore::gitignore::Gitignore;
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}; use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
use once_cell::sync::{Lazy, OnceCell}; use once_cell::sync::{Lazy, OnceCell};
use reqwest::{Client, Proxy}; use reqwest::{Client, IntoUrl, Proxy};
use scraper::{ElementRef, Html, Selector}; use scraper::{ElementRef, Html, Selector};
use serde_json::json; use serde_json::json;
use structopt::StructOpt; use structopt::StructOpt;
use tokio::{fs, sync::Semaphore}; use tokio::{fs, sync::Semaphore, time};
use tokio::task::{self, JoinHandle}; use tokio::task::{self, JoinHandle};
use tokio_util::io::StreamReader; use tokio_util::io::StreamReader;
use url::Url; use url::Url;
@ -36,6 +36,11 @@ static PROGRESS_BAR: Lazy<ProgressBar> = Lazy::new(|| ProgressBar::new(0));
/// Global job queue /// Global job queue
static TASKS: OnceCell<UnboundedSender<JoinHandle<()>>> = OnceCell::new(); static TASKS: OnceCell<UnboundedSender<JoinHandle<()>>> = OnceCell::new();
static TASKS_RUNNING: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(0)); static TASKS_RUNNING: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(0));
static REQUEST_TICKETS: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(0));
async fn get_request_ticket() {
REQUEST_TICKETS.acquire().await.unwrap().forget();
}
macro_rules! spawn { macro_rules! spawn {
($e:expr) => { ($e:expr) => {
@ -95,6 +100,14 @@ macro_rules! error {
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let opt = Opt::from_args(); let opt = Opt::from_args();
let rate = opt.rate;
task::spawn(async move {
let mut interval = time::interval(time::Duration::from_secs_f64(60.0 / rate as f64));
loop {
interval.tick().await;
REQUEST_TICKETS.add_permits(1);
}
});
if let Err(e) = real_main(opt).await { if let Err(e) = real_main(opt).await {
error!(e); error!(e);
} }
@ -465,9 +478,7 @@ async fn process(ilias: Arc<ILIAS>, path: PathBuf, obj: Object) -> Result<()> {
let meta = fs::metadata(&path).await; let meta = fs::metadata(&path).await;
if !ilias.opt.force && meta.is_ok() && ilias.opt.check_videos { if !ilias.opt.force && meta.is_ok() && ilias.opt.check_videos {
let head = ilias let head = ilias
.client
.head(url) .head(url)
.send()
.await .await
.context("HEAD request failed")?; .context("HEAD request failed")?;
if let Some(len) = head.headers().get("content-length") { if let Some(len) = head.headers().get("content-length") {
@ -757,7 +768,7 @@ async fn process(ilias: Arc<ILIAS>, path: PathBuf, obj: Object) -> Result<()> {
log!(2, "Skipping download, link exists already"); log!(2, "Skipping download, link exists already");
return Ok(()); return Ok(());
} }
let head_req_result = ilias.client.head(&url.url).send().await; let head_req_result = ilias.head(&url.url).await;
let url = match &head_req_result { let url = match &head_req_result {
Err(e) => e.url().context("HEAD request failed")?.as_str(), Err(e) => e.url().context("HEAD request failed")?.as_str(),
Ok(head) => head.url().as_str(), Ok(head) => head.url().as_str(),
@ -790,7 +801,7 @@ async fn process(ilias: Arc<ILIAS>, path: PathBuf, obj: Object) -> Result<()> {
continue; continue;
} }
let head = ilias.client.head(url.url.as_str()).send().await.context("HEAD request to web link failed"); let head = ilias.head(url.url.as_str()).await.context("HEAD request to web link failed");
if let Some(err) = head.as_ref().err() { if let Some(err) = head.as_ref().err() {
warning!(err); warning!(err);
continue; continue;
@ -839,7 +850,7 @@ struct Opt {
#[structopt(short)] #[structopt(short)]
force: bool, force: bool,
/// Use content tree (slow but thorough) /// Use content tree (experimental)
#[structopt(long)] #[structopt(long)]
content_tree: bool, content_tree: bool,
@ -879,6 +890,10 @@ struct Opt {
/// ILIAS page to download /// ILIAS page to download
#[structopt(long)] #[structopt(long)]
sync_url: Option<String>, sync_url: Option<String>,
/// Requests per minute
#[structopt(long, default_value = "8")]
rate: usize
} }
struct ILIAS { struct ILIAS {
@ -952,6 +967,7 @@ impl ILIAS {
} }
async fn download(&self, url: &str) -> Result<reqwest::Response> { async fn download(&self, url: &str) -> Result<reqwest::Response> {
get_request_ticket().await;
log!(2, "Downloading {}", url); log!(2, "Downloading {}", url);
if url.starts_with("http") || url.starts_with("ilias.studium.kit.edu") { if url.starts_with("http") || url.starts_with("ilias.studium.kit.edu") {
Ok(self.client.get(url).send().await?) Ok(self.client.get(url).send().await?)
@ -960,6 +976,11 @@ impl ILIAS {
} }
} }
async fn head<U: IntoUrl>(&self, url: U) -> Result<reqwest::Response, reqwest::Error> {
get_request_ticket().await;
self.client.head(url).send().await
}
async fn get_html(&self, url: &str) -> Result<Html> { async fn get_html(&self, url: &str) -> Result<Html> {
let text = self.download(url).await?.text().await?; let text = self.download(url).await?.text().await?;
let html = Html::parse_document(&text); let html = Html::parse_document(&text);