From 9f1a7c45dcee1268f6b222dd2fac4ec083a34d9a Mon Sep 17 00:00:00 2001 From: bigfoot547 Date: Sat, 11 Jan 2025 01:50:48 -0600 Subject: re-add reqwest (mistakes were made) --- src/launcher/download.rs | 144 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 122 insertions(+), 22 deletions(-) (limited to 'src/launcher/download.rs') diff --git a/src/launcher/download.rs b/src/launcher/download.rs index 4294d33..3598976 100644 --- a/src/launcher/download.rs +++ b/src/launcher/download.rs @@ -1,36 +1,136 @@ -use std::path::{Path, PathBuf}; -use sha1_smol::Digest; +use std::error::Error; +use std::fmt::{Debug, Display, Formatter}; +use futures::{stream, Stream, StreamExt}; +use reqwest::{Client, IntoUrl, Method, RequestBuilder}; +use crate::launcher::constants::USER_AGENT; -pub trait Download { - fn get_url(&self) -> &str; - fn get_path(&self) -> &Path; - fn get_expect_digest(&self) -> Option; - fn get_expect_size(&self) -> Option; +pub trait Download: Debug + Display { + type URLType: IntoUrl; + // return Ok(None) to skip downloading this file - fn always_redownload(&self) -> bool; + fn get_url(&self) -> Self::URLType; + + async fn prepare(&mut self, req: RequestBuilder) -> Result, Box>; + async fn handle_chunk(&mut self, chunk: &[u8]) -> Result<(), Box>; + async fn finish(&mut self) -> Result<(), Box>; +} + +pub struct MultiDownloader { + jobs: Vec, + client: Client, + nconcurrent: usize +} + +#[derive(Debug, Clone, Copy)] +pub enum Phase { + Prepare, + Send, + Receive, + HandleChunk, + Finish } -pub type DownloadJob = dyn Download + Sync + Send; +impl Display for Phase { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + /* an error occurred while (present participle) ... */ + Self::Prepare => f.write_str("preparing the request"), + Self::Send => f.write_str("sending the request"), + Self::Receive => f.write_str("receiving response data"), + Self::HandleChunk => f.write_str("handling response data"), + Self::Finish => f.write_str("finishing the request"), + } + } +} -pub struct MultiDownloader<'j, 'js> { - jobs: &'js [&'j DownloadJob], - nhandles: usize +pub struct PhaseDownloadError<'j, T: Download> { + phase: Phase, + inner: Box, + job: &'j T } -impl<'j, 'js> MultiDownloader<'j, 'js> { - pub fn new(jobs: &'js [&'j DownloadJob]) -> MultiDownloader<'j, 'js> { - Self::with_handles(jobs, 8) +impl<'j, T: Download> Debug for PhaseDownloadError<'j, T> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PhaseDownloadError") + .field("phase", &self.phase) + .field("inner", &self.inner) + .field("job", &self.job) + .finish() } +} + +impl<'j, T: Download> Display for PhaseDownloadError<'j, T> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "error while {} ({}): {}", self.phase, self.job, self.inner) + } +} + +impl<'j, T: Download> Error for PhaseDownloadError<'j, T> { + fn source(&self) -> Option<&(dyn Error + 'static)> { + Some(&*self.inner) + } +} + +impl<'j, T: Download> PhaseDownloadError<'j, T> { + fn new(phase: Phase, inner: Box, job: &'j T) -> Self { + PhaseDownloadError { + phase, inner, job + } + } +} + +impl MultiDownloader { + pub fn new(jobs: impl IntoIterator) -> MultiDownloader { + Self::with_concurrent(jobs, 8) + } + + pub fn with_concurrent(jobs: impl IntoIterator, n: usize) -> MultiDownloader { + assert!(n > 0); - pub fn with_handles(jobs: &'js [&'j DownloadJob], nhandles: usize) -> MultiDownloader<'j, 'js> { - assert!(nhandles > 0); - MultiDownloader { - jobs, nhandles + jobs: jobs.into_iter().collect(), + client: Client::new(), + nconcurrent: n } } - fn do_it(&self) { - + pub async fn perform(&mut self) -> impl Stream>> { + stream::iter(self.jobs.iter_mut()).map(|job| { + let client = &self.client; + + async move { + let rq = match job.prepare(client.request(Method::GET, job.get_url()).header(reqwest::header::USER_AGENT, USER_AGENT)).await { + Ok(opt) => match opt { + Some(rq) => rq, + None => return Ok(()) + }, + Err(e) => return Err(PhaseDownloadError::new(Phase::Prepare, e, job)) + }; + + let mut data = match rq.send().await { + Ok(data) => match data.error_for_status() { + Ok(data) => data.bytes_stream(), + Err(e) => return Err(PhaseDownloadError::new(Phase::Send, e.into(), job)) + }, + Err(e) => return Err(PhaseDownloadError::new(Phase::Send, e.into(), job)) + }; + + while let Some(bytes) = data.next().await { + let bytes = match bytes { + Ok(bytes) => bytes, + Err(e) => return Err(PhaseDownloadError::new(Phase::Receive, e.into(), job)) + }; + + match job.handle_chunk(bytes.as_ref()).await { + Ok(_) => (), + Err(e) => return Err(PhaseDownloadError::new(Phase::HandleChunk, e.into(), job)) + } + } + + job.finish().await.map_err(|e| PhaseDownloadError::new(Phase::Finish, e.into(), job))?; + + Ok(()) + } + }).buffer_unordered(self.nconcurrent) } -} \ No newline at end of file +} -- cgit v1.2.3-70-g09d2