use std::error::Error; use std::fmt::{Debug, Display, Formatter}; use futures::{stream, Stream, StreamExt}; use reqwest::{Client, IntoUrl, Method, RequestBuilder}; use crate::launcher::constants::USER_AGENT; pub trait Download: Debug + Display { type URLType: IntoUrl; // return Ok(None) to skip downloading this file fn get_url(&self) -> Self::URLType; async fn prepare(&mut self, req: RequestBuilder) -> Result, Box>; async fn handle_chunk(&mut self, chunk: &[u8]) -> Result<(), Box>; async fn finish(&mut self) -> Result<(), Box>; } pub struct MultiDownloader { jobs: Vec, client: Client, nconcurrent: usize } #[derive(Debug, Clone, Copy)] pub enum Phase { Prepare, Send, Receive, HandleChunk, Finish } impl Display for Phase { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { /* an error occurred while (present participle) ... */ Self::Prepare => f.write_str("preparing the request"), Self::Send => f.write_str("sending the request"), Self::Receive => f.write_str("receiving response data"), Self::HandleChunk => f.write_str("handling response data"), Self::Finish => f.write_str("finishing the request"), } } } pub struct PhaseDownloadError<'j, T: Download> { phase: Phase, inner: Box, job: &'j T } impl<'j, T: Download> Debug for PhaseDownloadError<'j, T> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("PhaseDownloadError") .field("phase", &self.phase) .field("inner", &self.inner) .field("job", &self.job) .finish() } } impl<'j, T: Download> Display for PhaseDownloadError<'j, T> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "error while {} ({}): {}", self.phase, self.job, self.inner) } } impl<'j, T: Download> Error for PhaseDownloadError<'j, T> { fn source(&self) -> Option<&(dyn Error + 'static)> { Some(&*self.inner) } } impl<'j, T: Download> PhaseDownloadError<'j, T> { fn new(phase: Phase, inner: Box, job: &'j T) -> Self { PhaseDownloadError { phase, inner, job } } } impl MultiDownloader { pub fn new(jobs: impl IntoIterator) -> MultiDownloader { Self::with_concurrent(jobs, 8) } pub fn with_concurrent(jobs: impl IntoIterator, n: usize) -> MultiDownloader { assert!(n > 0); MultiDownloader { jobs: jobs.into_iter().collect(), client: Client::new(), nconcurrent: n } } pub async fn perform(&mut self) -> impl Stream>> { stream::iter(self.jobs.iter_mut()).map(|job| { let client = &self.client; async move { let rq = match job.prepare(client.request(Method::GET, job.get_url()).header(reqwest::header::USER_AGENT, USER_AGENT)).await { Ok(opt) => match opt { Some(rq) => rq, None => return Ok(()) }, Err(e) => return Err(PhaseDownloadError::new(Phase::Prepare, e, job)) }; let mut data = match rq.send().await { Ok(data) => match data.error_for_status() { Ok(data) => data.bytes_stream(), Err(e) => return Err(PhaseDownloadError::new(Phase::Send, e.into(), job)) }, Err(e) => return Err(PhaseDownloadError::new(Phase::Send, e.into(), job)) }; while let Some(bytes) = data.next().await { let bytes = match bytes { Ok(bytes) => bytes, Err(e) => return Err(PhaseDownloadError::new(Phase::Receive, e.into(), job)) }; match job.handle_chunk(bytes.as_ref()).await { Ok(_) => (), Err(e) => return Err(PhaseDownloadError::new(Phase::HandleChunk, e.into(), job)) } } job.finish().await.map_err(|e| PhaseDownloadError::new(Phase::Finish, e.into(), job))?; Ok(()) } }).buffer_unordered(self.nconcurrent) } }