From 9f1a7c45dcee1268f6b222dd2fac4ec083a34d9a Mon Sep 17 00:00:00 2001 From: bigfoot547 Date: Sat, 11 Jan 2025 01:50:48 -0600 Subject: re-add reqwest (mistakes were made) --- src/launcher/download.rs | 144 +++++++++++++++++++++++++++++++++++++++-------- src/launcher/request.rs | 139 --------------------------------------------- src/launcher/version.rs | 5 +- 3 files changed, 124 insertions(+), 164 deletions(-) delete mode 100644 src/launcher/request.rs (limited to 'src/launcher') diff --git a/src/launcher/download.rs b/src/launcher/download.rs index 4294d33..3598976 100644 --- a/src/launcher/download.rs +++ b/src/launcher/download.rs @@ -1,36 +1,136 @@ -use std::path::{Path, PathBuf}; -use sha1_smol::Digest; +use std::error::Error; +use std::fmt::{Debug, Display, Formatter}; +use futures::{stream, Stream, StreamExt}; +use reqwest::{Client, IntoUrl, Method, RequestBuilder}; +use crate::launcher::constants::USER_AGENT; -pub trait Download { - fn get_url(&self) -> &str; - fn get_path(&self) -> &Path; - fn get_expect_digest(&self) -> Option; - fn get_expect_size(&self) -> Option; +pub trait Download: Debug + Display { + type URLType: IntoUrl; + // return Ok(None) to skip downloading this file - fn always_redownload(&self) -> bool; + fn get_url(&self) -> Self::URLType; + + async fn prepare(&mut self, req: RequestBuilder) -> Result, Box>; + async fn handle_chunk(&mut self, chunk: &[u8]) -> Result<(), Box>; + async fn finish(&mut self) -> Result<(), Box>; +} + +pub struct MultiDownloader { + jobs: Vec, + client: Client, + nconcurrent: usize +} + +#[derive(Debug, Clone, Copy)] +pub enum Phase { + Prepare, + Send, + Receive, + HandleChunk, + Finish } -pub type DownloadJob = dyn Download + Sync + Send; +impl Display for Phase { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + /* an error occurred while (present participle) ... */ + Self::Prepare => f.write_str("preparing the request"), + Self::Send => f.write_str("sending the request"), + Self::Receive => f.write_str("receiving response data"), + Self::HandleChunk => f.write_str("handling response data"), + Self::Finish => f.write_str("finishing the request"), + } + } +} -pub struct MultiDownloader<'j, 'js> { - jobs: &'js [&'j DownloadJob], - nhandles: usize +pub struct PhaseDownloadError<'j, T: Download> { + phase: Phase, + inner: Box, + job: &'j T } -impl<'j, 'js> MultiDownloader<'j, 'js> { - pub fn new(jobs: &'js [&'j DownloadJob]) -> MultiDownloader<'j, 'js> { - Self::with_handles(jobs, 8) +impl<'j, T: Download> Debug for PhaseDownloadError<'j, T> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PhaseDownloadError") + .field("phase", &self.phase) + .field("inner", &self.inner) + .field("job", &self.job) + .finish() } +} + +impl<'j, T: Download> Display for PhaseDownloadError<'j, T> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "error while {} ({}): {}", self.phase, self.job, self.inner) + } +} + +impl<'j, T: Download> Error for PhaseDownloadError<'j, T> { + fn source(&self) -> Option<&(dyn Error + 'static)> { + Some(&*self.inner) + } +} + +impl<'j, T: Download> PhaseDownloadError<'j, T> { + fn new(phase: Phase, inner: Box, job: &'j T) -> Self { + PhaseDownloadError { + phase, inner, job + } + } +} + +impl MultiDownloader { + pub fn new(jobs: impl IntoIterator) -> MultiDownloader { + Self::with_concurrent(jobs, 8) + } + + pub fn with_concurrent(jobs: impl IntoIterator, n: usize) -> MultiDownloader { + assert!(n > 0); - pub fn with_handles(jobs: &'js [&'j DownloadJob], nhandles: usize) -> MultiDownloader<'j, 'js> { - assert!(nhandles > 0); - MultiDownloader { - jobs, nhandles + jobs: jobs.into_iter().collect(), + client: Client::new(), + nconcurrent: n } } - fn do_it(&self) { - + pub async fn perform(&mut self) -> impl Stream>> { + stream::iter(self.jobs.iter_mut()).map(|job| { + let client = &self.client; + + async move { + let rq = match job.prepare(client.request(Method::GET, job.get_url()).header(reqwest::header::USER_AGENT, USER_AGENT)).await { + Ok(opt) => match opt { + Some(rq) => rq, + None => return Ok(()) + }, + Err(e) => return Err(PhaseDownloadError::new(Phase::Prepare, e, job)) + }; + + let mut data = match rq.send().await { + Ok(data) => match data.error_for_status() { + Ok(data) => data.bytes_stream(), + Err(e) => return Err(PhaseDownloadError::new(Phase::Send, e.into(), job)) + }, + Err(e) => return Err(PhaseDownloadError::new(Phase::Send, e.into(), job)) + }; + + while let Some(bytes) = data.next().await { + let bytes = match bytes { + Ok(bytes) => bytes, + Err(e) => return Err(PhaseDownloadError::new(Phase::Receive, e.into(), job)) + }; + + match job.handle_chunk(bytes.as_ref()).await { + Ok(_) => (), + Err(e) => return Err(PhaseDownloadError::new(Phase::HandleChunk, e.into(), job)) + } + } + + job.finish().await.map_err(|e| PhaseDownloadError::new(Phase::Finish, e.into(), job))?; + + Ok(()) + } + }).buffer_unordered(self.nconcurrent) } -} \ No newline at end of file +} diff --git a/src/launcher/request.rs b/src/launcher/request.rs deleted file mode 100644 index df89a8b..0000000 --- a/src/launcher/request.rs +++ /dev/null @@ -1,139 +0,0 @@ -use std::error::Error; -use std::fmt::Display; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; -use curl::easy::{Easy}; -use tokio::sync::oneshot; -use tokio::sync::oneshot::Receiver; -use tokio::task; -use crate::launcher::constants::USER_AGENT; - -// yeah this is basically reqwest but bad (I did not want to rely on both reqwest and curl) - -#[derive(Clone, Copy)] -enum FetchState { - Primed, - Running, - Complete -} - -pub struct EasyFetch { - easy: Option, - state: FetchState, - response: Option>> -} - -impl EasyFetch { - fn new(easy: Easy) -> Self { - EasyFetch { - easy: Some(easy), - state: FetchState::Primed, - response: None - } - } - - pub fn get(url: &str) -> Self { - let mut easy = Easy::new(); - easy.useragent(USER_AGENT).expect("couldn't set user agent"); - easy.get(true).expect("couldn't set request method"); - easy.url(url).expect("couldn't set url"); - - Self::new(easy) - } -} - -#[derive(Debug)] -pub struct FetchResult { - easy: Easy, - response_code: u32, - data: Vec, -} - -#[derive(Debug)] -pub struct FetchResponseError(u32); - -impl FetchResponseError { - pub fn get_code(&self) -> u32 { - self.0 - } -} - -impl Display for FetchResponseError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "http response: {}", self.0) - } -} - -impl Error for FetchResponseError {} - -impl FetchResult { - pub fn get_response_code(&self) -> u32 { - self.response_code - } - - pub fn get_data(&self) -> &[u8] { - &self.data - } - - pub fn get_data_string(&self) -> String { - String::from_utf8_lossy(&self.data).to_string() - } - - pub fn error_for_status(self) -> Result { - if self.response_code / 100 == 2 { - Ok(self) - } else { - Err(FetchResponseError(self.response_code)) - } - } -} - -impl Future for EasyFetch { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let self_ref = self.get_mut(); - - match &self_ref.state { - FetchState::Primed => { - self_ref.state = FetchState::Running; - let mut easy = self_ref.easy.take().unwrap(); - let waker = cx.waker().clone(); - - let (tx, rx) = oneshot::channel::>(); - self_ref.response.replace(rx); - - task::spawn_blocking(move || { - let mut out_data: Vec = Vec::new(); - let mut transfer = easy.transfer(); - - transfer.write_function(|data| { - out_data.extend_from_slice(data); - Ok(data.len()) - }).expect("infallible curl operation failed"); - - let res = transfer.perform(); - drop(transfer); // have to explicitly drop to release borrow on "easy" - - out_data.shrink_to_fit(); - - tx.send(res.map(|_| FetchResult { - response_code: easy.response_code().expect("querying response code should not fail"), - data: out_data, - easy - })).expect("curl fetch reader hangup (this shouldn't happen)"); - waker.wake(); - }); - - Poll::Pending - }, - FetchState::Running => { - self_ref.state = FetchState::Complete; - Poll::Ready(self_ref.response.take().unwrap().try_recv() - .expect("curl fetch writer hangup or not ready (this shouldn't happen)")) - }, - FetchState::Complete => panic!("fetch polled after completion") - } - } -} \ No newline at end of file diff --git a/src/launcher/version.rs b/src/launcher/version.rs index 0337864..f4cdd6c 100644 --- a/src/launcher/version.rs +++ b/src/launcher/version.rs @@ -6,7 +6,6 @@ use std::path::{Path, PathBuf}; use log::{debug, info, warn}; use sha1_smol::Digest; -use super::request::EasyFetch; use crate::util; use crate::version::{*, manifest::*}; @@ -19,7 +18,7 @@ struct RemoteVersionList { impl RemoteVersionList { async fn new() -> Result> { - let text = EasyFetch::get(URL_VERSION_MANIFEST).await?.error_for_status()?.get_data_string(); + let text = reqwest::get(URL_VERSION_MANIFEST).await?.error_for_status()?.text().await?; let manifest: VersionManifest = serde_json::from_str(text.as_str())?; let mut versions = HashMap::new(); @@ -46,7 +45,7 @@ impl RemoteVersionList { } // download it - let ver_text = EasyFetch::get(ver.url.as_str()).await?.error_for_status()?.get_data_string(); + let ver_text = reqwest::get(ver.url.as_str()).await?.error_for_status()?.text().await?; // make sure it's valid util::verify_sha1(ver.sha1, ver_text.as_str()) -- cgit v1.2.3-70-g09d2