summaryrefslogtreecommitdiffstats
path: root/src/launcher/download.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/launcher/download.rs')
-rw-r--r--src/launcher/download.rs144
1 files changed, 122 insertions, 22 deletions
diff --git a/src/launcher/download.rs b/src/launcher/download.rs
index 4294d33..3598976 100644
--- a/src/launcher/download.rs
+++ b/src/launcher/download.rs
@@ -1,36 +1,136 @@
-use std::path::{Path, PathBuf};
-use sha1_smol::Digest;
+use std::error::Error;
+use std::fmt::{Debug, Display, Formatter};
+use futures::{stream, Stream, StreamExt};
+use reqwest::{Client, IntoUrl, Method, RequestBuilder};
+use crate::launcher::constants::USER_AGENT;
-pub trait Download {
- fn get_url(&self) -> &str;
- fn get_path(&self) -> &Path;
- fn get_expect_digest(&self) -> Option<Digest>;
- fn get_expect_size(&self) -> Option<usize>;
+pub trait Download: Debug + Display {
+ type URLType: IntoUrl;
+ // return Ok(None) to skip downloading this file
- fn always_redownload(&self) -> bool;
+ fn get_url(&self) -> Self::URLType;
+
+ async fn prepare(&mut self, req: RequestBuilder) -> Result<Option<RequestBuilder>, Box<dyn Error>>;
+ async fn handle_chunk(&mut self, chunk: &[u8]) -> Result<(), Box<dyn Error>>;
+ async fn finish(&mut self) -> Result<(), Box<dyn Error>>;
+}
+
+pub struct MultiDownloader<T: Download> {
+ jobs: Vec<T>,
+ client: Client,
+ nconcurrent: usize
+}
+
+#[derive(Debug, Clone, Copy)]
+pub enum Phase {
+ Prepare,
+ Send,
+ Receive,
+ HandleChunk,
+ Finish
}
-pub type DownloadJob = dyn Download + Sync + Send;
+impl Display for Phase {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ match self {
+ /* an error occurred while (present participle) ... */
+ Self::Prepare => f.write_str("preparing the request"),
+ Self::Send => f.write_str("sending the request"),
+ Self::Receive => f.write_str("receiving response data"),
+ Self::HandleChunk => f.write_str("handling response data"),
+ Self::Finish => f.write_str("finishing the request"),
+ }
+ }
+}
-pub struct MultiDownloader<'j, 'js> {
- jobs: &'js [&'j DownloadJob],
- nhandles: usize
+pub struct PhaseDownloadError<'j, T: Download> {
+ phase: Phase,
+ inner: Box<dyn Error>,
+ job: &'j T
}
-impl<'j, 'js> MultiDownloader<'j, 'js> {
- pub fn new(jobs: &'js [&'j DownloadJob]) -> MultiDownloader<'j, 'js> {
- Self::with_handles(jobs, 8)
+impl<'j, T: Download> Debug for PhaseDownloadError<'j, T> {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PhaseDownloadError")
+ .field("phase", &self.phase)
+ .field("inner", &self.inner)
+ .field("job", &self.job)
+ .finish()
}
+}
+
+impl<'j, T: Download> Display for PhaseDownloadError<'j, T> {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(f, "error while {} ({}): {}", self.phase, self.job, self.inner)
+ }
+}
+
+impl<'j, T: Download> Error for PhaseDownloadError<'j, T> {
+ fn source(&self) -> Option<&(dyn Error + 'static)> {
+ Some(&*self.inner)
+ }
+}
+
+impl<'j, T: Download> PhaseDownloadError<'j, T> {
+ fn new(phase: Phase, inner: Box<dyn Error>, job: &'j T) -> Self {
+ PhaseDownloadError {
+ phase, inner, job
+ }
+ }
+}
+
+impl<T: Download> MultiDownloader<T> {
+ pub fn new(jobs: impl IntoIterator<Item = T>) -> MultiDownloader<T> {
+ Self::with_concurrent(jobs, 8)
+ }
+
+ pub fn with_concurrent(jobs: impl IntoIterator<Item = T>, n: usize) -> MultiDownloader<T> {
+ assert!(n > 0);
- pub fn with_handles(jobs: &'js [&'j DownloadJob], nhandles: usize) -> MultiDownloader<'j, 'js> {
- assert!(nhandles > 0);
-
MultiDownloader {
- jobs, nhandles
+ jobs: jobs.into_iter().collect(),
+ client: Client::new(),
+ nconcurrent: n
}
}
- fn do_it(&self) {
-
+ pub async fn perform(&mut self) -> impl Stream<Item = Result<(), PhaseDownloadError<T>>> {
+ stream::iter(self.jobs.iter_mut()).map(|job| {
+ let client = &self.client;
+
+ async move {
+ let rq = match job.prepare(client.request(Method::GET, job.get_url()).header(reqwest::header::USER_AGENT, USER_AGENT)).await {
+ Ok(opt) => match opt {
+ Some(rq) => rq,
+ None => return Ok(())
+ },
+ Err(e) => return Err(PhaseDownloadError::new(Phase::Prepare, e, job))
+ };
+
+ let mut data = match rq.send().await {
+ Ok(data) => match data.error_for_status() {
+ Ok(data) => data.bytes_stream(),
+ Err(e) => return Err(PhaseDownloadError::new(Phase::Send, e.into(), job))
+ },
+ Err(e) => return Err(PhaseDownloadError::new(Phase::Send, e.into(), job))
+ };
+
+ while let Some(bytes) = data.next().await {
+ let bytes = match bytes {
+ Ok(bytes) => bytes,
+ Err(e) => return Err(PhaseDownloadError::new(Phase::Receive, e.into(), job))
+ };
+
+ match job.handle_chunk(bytes.as_ref()).await {
+ Ok(_) => (),
+ Err(e) => return Err(PhaseDownloadError::new(Phase::HandleChunk, e.into(), job))
+ }
+ }
+
+ job.finish().await.map_err(|e| PhaseDownloadError::new(Phase::Finish, e.into(), job))?;
+
+ Ok(())
+ }
+ }).buffer_unordered(self.nconcurrent)
}
-} \ No newline at end of file
+}