summaryrefslogtreecommitdiffstats
path: root/src/launcher
diff options
context:
space:
mode:
Diffstat (limited to 'src/launcher')
-rw-r--r--src/launcher/assets.rs7
-rw-r--r--src/launcher/download.rs50
2 files changed, 26 insertions, 31 deletions
diff --git a/src/launcher/assets.rs b/src/launcher/assets.rs
index e732877..e540e50 100644
--- a/src/launcher/assets.rs
+++ b/src/launcher/assets.rs
@@ -6,6 +6,7 @@ use std::path::{Path, PathBuf};
use std::path::Component::Normal;
use futures::TryStreamExt;
use log::{debug, info, warn};
+use reqwest::Client;
use sha1_smol::Sha1;
use tokio::{fs, io};
use crate::assets::{Asset, AssetIndex};
@@ -215,15 +216,15 @@ impl AssetRepository {
if self.online {
info!("Downloading {} asset objects...", downloads.len());
- let mut multi = MultiDownloader::new(downloads);
- multi.perform().await
+ let client = Client::new();
+ MultiDownloader::with_concurrent(downloads.iter_mut(), 16).perform(&client).await
.inspect_err(|e| warn!("asset download failed: {e}"))
.try_fold((), |_, _| async {Ok(())})
.await
.map_err(|_| AssetError::AssetObjectDownload)?;
} else {
info!("Verifying {} asset objects...", downloads.len());
- super::download::verify_files(downloads).await.map_err(|e| AssetError::AssetVerifyError(e))?;
+ super::download::verify_files(downloads.iter_mut()).await.map_err(|e| AssetError::AssetVerifyError(e))?;
}
Ok(())
diff --git a/src/launcher/download.rs b/src/launcher/download.rs
index d8d05f7..f5469d8 100644
--- a/src/launcher/download.rs
+++ b/src/launcher/download.rs
@@ -22,9 +22,8 @@ pub trait Download: Debug + Display {
async fn finish(&mut self) -> Result<(), Box<dyn Error>>;
}
-pub struct MultiDownloader<T: Download> {
- jobs: Vec<T>,
- client: Client,
+pub struct MultiDownloader<'j, T: Download + 'j, I: Iterator<Item = &'j mut T>> {
+ jobs: I,
nconcurrent: usize
}
@@ -86,25 +85,22 @@ impl<'j, T: Download> PhaseDownloadError<'j, T> {
}
}
-impl<T: Download> MultiDownloader<T> {
- pub fn new(jobs: impl IntoIterator<Item = T>) -> MultiDownloader<T> {
+impl<'j, T: Download + 'j, I: Iterator<Item = &'j mut T>> MultiDownloader<'j, T, I> {
+ pub fn new(jobs: I) -> MultiDownloader<'j, T, I> {
Self::with_concurrent(jobs, 8)
}
- pub fn with_concurrent(jobs: impl IntoIterator<Item = T>, n: usize) -> MultiDownloader<T> {
+ pub fn with_concurrent(jobs: I, n: usize) -> MultiDownloader<'j, T, I> {
assert!(n > 0);
MultiDownloader {
- jobs: jobs.into_iter().collect(),
- client: Client::new(),
+ jobs,
nconcurrent: n
}
}
- pub async fn perform(&mut self) -> impl TryStream<Ok = (), Error = PhaseDownloadError<T>> {
- stream::iter(self.jobs.iter_mut()).map(|job| {
- let client = &self.client;
-
+ pub async fn perform(self, client: &'j Client) -> impl TryStream<Ok = (), Error = PhaseDownloadError<'j, T>> {
+ stream::iter(self.jobs.into_iter()).map(move |job| Ok(async move {
macro_rules! map_err {
($result:expr, $phase:expr, $job:expr) => {
match $result {
@@ -114,26 +110,24 @@ impl<T: Download> MultiDownloader<T> {
}
}
- async move {
- let Some(rq) = map_err!(
- job.prepare(client.request(Method::GET, job.get_url())
- .header(reqwest::header::USER_AGENT, USER_AGENT)).await, Phase::Prepare, job) else {
- return Ok(())
- };
+ let Some(rq) = map_err!(
+ job.prepare(client.request(Method::GET, job.get_url())
+ .header(reqwest::header::USER_AGENT, USER_AGENT)).await, Phase::Prepare, job) else {
+ return Ok(())
+ };
- let mut data = map_err!(map_err!(rq.send().await, Phase::Send, job).error_for_status(), Phase::Send, job).bytes_stream();
+ let mut data = map_err!(map_err!(rq.send().await, Phase::Send, job).error_for_status(), Phase::Send, job).bytes_stream();
- while let Some(bytes) = data.next().await {
- let bytes = map_err!(bytes, Phase::Receive, job);
+ while let Some(bytes) = data.next().await {
+ let bytes = map_err!(bytes, Phase::Receive, job);
- map_err!(job.handle_chunk(bytes.as_ref()).await, Phase::HandleChunk, job);
- }
+ map_err!(job.handle_chunk(bytes.as_ref()).await, Phase::HandleChunk, job);
+ }
- job.finish().await.map_err(|e| PhaseDownloadError::new(Phase::Finish, e.into(), job))?;
+ job.finish().await.map_err(|e| PhaseDownloadError::new(Phase::Finish, e.into(), job))?;
- Ok(())
- }
- }).buffer_unordered(self.nconcurrent)
+ Ok(())
+ })).try_buffer_unordered(self.nconcurrent)
}
}
@@ -282,7 +276,7 @@ impl Download for VerifiedDownload {
}
}
-pub async fn verify_files(files: Vec<VerifiedDownload>) -> Result<(), FileVerifyError> {
+pub async fn verify_files(files: impl Iterator<Item = &mut VerifiedDownload>) -> Result<(), FileVerifyError> {
stream::iter(files)
.map(|dl| Ok(async move {
debug!("Verifying library {}", dl.get_path().display());