diff options
Diffstat (limited to 'src/launcher')
| -rw-r--r-- | src/launcher/assets.rs | 7 | ||||
| -rw-r--r-- | src/launcher/download.rs | 50 |
2 files changed, 26 insertions, 31 deletions
diff --git a/src/launcher/assets.rs b/src/launcher/assets.rs index e732877..e540e50 100644 --- a/src/launcher/assets.rs +++ b/src/launcher/assets.rs @@ -6,6 +6,7 @@ use std::path::{Path, PathBuf}; use std::path::Component::Normal;
use futures::TryStreamExt;
use log::{debug, info, warn};
+use reqwest::Client;
use sha1_smol::Sha1;
use tokio::{fs, io};
use crate::assets::{Asset, AssetIndex};
@@ -215,15 +216,15 @@ impl AssetRepository { if self.online {
info!("Downloading {} asset objects...", downloads.len());
- let mut multi = MultiDownloader::new(downloads);
- multi.perform().await
+ let client = Client::new();
+ MultiDownloader::with_concurrent(downloads.iter_mut(), 16).perform(&client).await
.inspect_err(|e| warn!("asset download failed: {e}"))
.try_fold((), |_, _| async {Ok(())})
.await
.map_err(|_| AssetError::AssetObjectDownload)?;
} else {
info!("Verifying {} asset objects...", downloads.len());
- super::download::verify_files(downloads).await.map_err(|e| AssetError::AssetVerifyError(e))?;
+ super::download::verify_files(downloads.iter_mut()).await.map_err(|e| AssetError::AssetVerifyError(e))?;
}
Ok(())
diff --git a/src/launcher/download.rs b/src/launcher/download.rs index d8d05f7..f5469d8 100644 --- a/src/launcher/download.rs +++ b/src/launcher/download.rs @@ -22,9 +22,8 @@ pub trait Download: Debug + Display { async fn finish(&mut self) -> Result<(), Box<dyn Error>>; } -pub struct MultiDownloader<T: Download> { - jobs: Vec<T>, - client: Client, +pub struct MultiDownloader<'j, T: Download + 'j, I: Iterator<Item = &'j mut T>> { + jobs: I, nconcurrent: usize } @@ -86,25 +85,22 @@ impl<'j, T: Download> PhaseDownloadError<'j, T> { } } -impl<T: Download> MultiDownloader<T> { - pub fn new(jobs: impl IntoIterator<Item = T>) -> MultiDownloader<T> { +impl<'j, T: Download + 'j, I: Iterator<Item = &'j mut T>> MultiDownloader<'j, T, I> { + pub fn new(jobs: I) -> MultiDownloader<'j, T, I> { Self::with_concurrent(jobs, 8) } - pub fn with_concurrent(jobs: impl IntoIterator<Item = T>, n: usize) -> MultiDownloader<T> { + pub fn with_concurrent(jobs: I, n: usize) -> MultiDownloader<'j, T, I> { assert!(n > 0); MultiDownloader { - jobs: jobs.into_iter().collect(), - client: Client::new(), + jobs, nconcurrent: n } } - pub async fn perform(&mut self) -> impl TryStream<Ok = (), Error = PhaseDownloadError<T>> { - stream::iter(self.jobs.iter_mut()).map(|job| { - let client = &self.client; - + pub async fn perform(self, client: &'j Client) -> impl TryStream<Ok = (), Error = PhaseDownloadError<'j, T>> { + stream::iter(self.jobs.into_iter()).map(move |job| Ok(async move { macro_rules! map_err { ($result:expr, $phase:expr, $job:expr) => { match $result { @@ -114,26 +110,24 @@ impl<T: Download> MultiDownloader<T> { } } - async move { - let Some(rq) = map_err!( - job.prepare(client.request(Method::GET, job.get_url()) - .header(reqwest::header::USER_AGENT, USER_AGENT)).await, Phase::Prepare, job) else { - return Ok(()) - }; + let Some(rq) = map_err!( + job.prepare(client.request(Method::GET, job.get_url()) + .header(reqwest::header::USER_AGENT, USER_AGENT)).await, Phase::Prepare, job) else { + return Ok(()) + }; - let mut data = map_err!(map_err!(rq.send().await, Phase::Send, job).error_for_status(), Phase::Send, job).bytes_stream(); + let mut data = map_err!(map_err!(rq.send().await, Phase::Send, job).error_for_status(), Phase::Send, job).bytes_stream(); - while let Some(bytes) = data.next().await { - let bytes = map_err!(bytes, Phase::Receive, job); + while let Some(bytes) = data.next().await { + let bytes = map_err!(bytes, Phase::Receive, job); - map_err!(job.handle_chunk(bytes.as_ref()).await, Phase::HandleChunk, job); - } + map_err!(job.handle_chunk(bytes.as_ref()).await, Phase::HandleChunk, job); + } - job.finish().await.map_err(|e| PhaseDownloadError::new(Phase::Finish, e.into(), job))?; + job.finish().await.map_err(|e| PhaseDownloadError::new(Phase::Finish, e.into(), job))?; - Ok(()) - } - }).buffer_unordered(self.nconcurrent) + Ok(()) + })).try_buffer_unordered(self.nconcurrent) } } @@ -282,7 +276,7 @@ impl Download for VerifiedDownload { } } -pub async fn verify_files(files: Vec<VerifiedDownload>) -> Result<(), FileVerifyError> { +pub async fn verify_files(files: impl Iterator<Item = &mut VerifiedDownload>) -> Result<(), FileVerifyError> { stream::iter(files) .map(|dl| Ok(async move { debug!("Verifying library {}", dl.get_path().display()); |
