summaryrefslogtreecommitdiffstats
path: root/src/launcher
diff options
context:
space:
mode:
Diffstat (limited to 'src/launcher')
-rw-r--r--src/launcher/download.rs144
-rw-r--r--src/launcher/request.rs139
-rw-r--r--src/launcher/version.rs5
3 files changed, 124 insertions, 164 deletions
diff --git a/src/launcher/download.rs b/src/launcher/download.rs
index 4294d33..3598976 100644
--- a/src/launcher/download.rs
+++ b/src/launcher/download.rs
@@ -1,36 +1,136 @@
-use std::path::{Path, PathBuf};
-use sha1_smol::Digest;
+use std::error::Error;
+use std::fmt::{Debug, Display, Formatter};
+use futures::{stream, Stream, StreamExt};
+use reqwest::{Client, IntoUrl, Method, RequestBuilder};
+use crate::launcher::constants::USER_AGENT;
-pub trait Download {
- fn get_url(&self) -> &str;
- fn get_path(&self) -> &Path;
- fn get_expect_digest(&self) -> Option<Digest>;
- fn get_expect_size(&self) -> Option<usize>;
+pub trait Download: Debug + Display {
+ type URLType: IntoUrl;
+ // return Ok(None) to skip downloading this file
- fn always_redownload(&self) -> bool;
+ fn get_url(&self) -> Self::URLType;
+
+ async fn prepare(&mut self, req: RequestBuilder) -> Result<Option<RequestBuilder>, Box<dyn Error>>;
+ async fn handle_chunk(&mut self, chunk: &[u8]) -> Result<(), Box<dyn Error>>;
+ async fn finish(&mut self) -> Result<(), Box<dyn Error>>;
+}
+
+pub struct MultiDownloader<T: Download> {
+ jobs: Vec<T>,
+ client: Client,
+ nconcurrent: usize
+}
+
+#[derive(Debug, Clone, Copy)]
+pub enum Phase {
+ Prepare,
+ Send,
+ Receive,
+ HandleChunk,
+ Finish
}
-pub type DownloadJob = dyn Download + Sync + Send;
+impl Display for Phase {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ match self {
+ /* an error occurred while (present participle) ... */
+ Self::Prepare => f.write_str("preparing the request"),
+ Self::Send => f.write_str("sending the request"),
+ Self::Receive => f.write_str("receiving response data"),
+ Self::HandleChunk => f.write_str("handling response data"),
+ Self::Finish => f.write_str("finishing the request"),
+ }
+ }
+}
-pub struct MultiDownloader<'j, 'js> {
- jobs: &'js [&'j DownloadJob],
- nhandles: usize
+pub struct PhaseDownloadError<'j, T: Download> {
+ phase: Phase,
+ inner: Box<dyn Error>,
+ job: &'j T
}
-impl<'j, 'js> MultiDownloader<'j, 'js> {
- pub fn new(jobs: &'js [&'j DownloadJob]) -> MultiDownloader<'j, 'js> {
- Self::with_handles(jobs, 8)
+impl<'j, T: Download> Debug for PhaseDownloadError<'j, T> {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PhaseDownloadError")
+ .field("phase", &self.phase)
+ .field("inner", &self.inner)
+ .field("job", &self.job)
+ .finish()
}
+}
+
+impl<'j, T: Download> Display for PhaseDownloadError<'j, T> {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(f, "error while {} ({}): {}", self.phase, self.job, self.inner)
+ }
+}
+
+impl<'j, T: Download> Error for PhaseDownloadError<'j, T> {
+ fn source(&self) -> Option<&(dyn Error + 'static)> {
+ Some(&*self.inner)
+ }
+}
+
+impl<'j, T: Download> PhaseDownloadError<'j, T> {
+ fn new(phase: Phase, inner: Box<dyn Error>, job: &'j T) -> Self {
+ PhaseDownloadError {
+ phase, inner, job
+ }
+ }
+}
+
+impl<T: Download> MultiDownloader<T> {
+ pub fn new(jobs: impl IntoIterator<Item = T>) -> MultiDownloader<T> {
+ Self::with_concurrent(jobs, 8)
+ }
+
+ pub fn with_concurrent(jobs: impl IntoIterator<Item = T>, n: usize) -> MultiDownloader<T> {
+ assert!(n > 0);
- pub fn with_handles(jobs: &'js [&'j DownloadJob], nhandles: usize) -> MultiDownloader<'j, 'js> {
- assert!(nhandles > 0);
-
MultiDownloader {
- jobs, nhandles
+ jobs: jobs.into_iter().collect(),
+ client: Client::new(),
+ nconcurrent: n
}
}
- fn do_it(&self) {
-
+ pub async fn perform(&mut self) -> impl Stream<Item = Result<(), PhaseDownloadError<T>>> {
+ stream::iter(self.jobs.iter_mut()).map(|job| {
+ let client = &self.client;
+
+ async move {
+ let rq = match job.prepare(client.request(Method::GET, job.get_url()).header(reqwest::header::USER_AGENT, USER_AGENT)).await {
+ Ok(opt) => match opt {
+ Some(rq) => rq,
+ None => return Ok(())
+ },
+ Err(e) => return Err(PhaseDownloadError::new(Phase::Prepare, e, job))
+ };
+
+ let mut data = match rq.send().await {
+ Ok(data) => match data.error_for_status() {
+ Ok(data) => data.bytes_stream(),
+ Err(e) => return Err(PhaseDownloadError::new(Phase::Send, e.into(), job))
+ },
+ Err(e) => return Err(PhaseDownloadError::new(Phase::Send, e.into(), job))
+ };
+
+ while let Some(bytes) = data.next().await {
+ let bytes = match bytes {
+ Ok(bytes) => bytes,
+ Err(e) => return Err(PhaseDownloadError::new(Phase::Receive, e.into(), job))
+ };
+
+ match job.handle_chunk(bytes.as_ref()).await {
+ Ok(_) => (),
+ Err(e) => return Err(PhaseDownloadError::new(Phase::HandleChunk, e.into(), job))
+ }
+ }
+
+ job.finish().await.map_err(|e| PhaseDownloadError::new(Phase::Finish, e.into(), job))?;
+
+ Ok(())
+ }
+ }).buffer_unordered(self.nconcurrent)
}
-} \ No newline at end of file
+}
diff --git a/src/launcher/request.rs b/src/launcher/request.rs
deleted file mode 100644
index df89a8b..0000000
--- a/src/launcher/request.rs
+++ /dev/null
@@ -1,139 +0,0 @@
-use std::error::Error;
-use std::fmt::Display;
-use std::future::Future;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-use curl::easy::{Easy};
-use tokio::sync::oneshot;
-use tokio::sync::oneshot::Receiver;
-use tokio::task;
-use crate::launcher::constants::USER_AGENT;
-
-// yeah this is basically reqwest but bad (I did not want to rely on both reqwest and curl)
-
-#[derive(Clone, Copy)]
-enum FetchState {
- Primed,
- Running,
- Complete
-}
-
-pub struct EasyFetch {
- easy: Option<Easy>,
- state: FetchState,
- response: Option<Receiver<Result<FetchResult, curl::Error>>>
-}
-
-impl EasyFetch {
- fn new(easy: Easy) -> Self {
- EasyFetch {
- easy: Some(easy),
- state: FetchState::Primed,
- response: None
- }
- }
-
- pub fn get(url: &str) -> Self {
- let mut easy = Easy::new();
- easy.useragent(USER_AGENT).expect("couldn't set user agent");
- easy.get(true).expect("couldn't set request method");
- easy.url(url).expect("couldn't set url");
-
- Self::new(easy)
- }
-}
-
-#[derive(Debug)]
-pub struct FetchResult {
- easy: Easy,
- response_code: u32,
- data: Vec<u8>,
-}
-
-#[derive(Debug)]
-pub struct FetchResponseError(u32);
-
-impl FetchResponseError {
- pub fn get_code(&self) -> u32 {
- self.0
- }
-}
-
-impl Display for FetchResponseError {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "http response: {}", self.0)
- }
-}
-
-impl Error for FetchResponseError {}
-
-impl FetchResult {
- pub fn get_response_code(&self) -> u32 {
- self.response_code
- }
-
- pub fn get_data(&self) -> &[u8] {
- &self.data
- }
-
- pub fn get_data_string(&self) -> String {
- String::from_utf8_lossy(&self.data).to_string()
- }
-
- pub fn error_for_status(self) -> Result<Self, FetchResponseError> {
- if self.response_code / 100 == 2 {
- Ok(self)
- } else {
- Err(FetchResponseError(self.response_code))
- }
- }
-}
-
-impl Future for EasyFetch {
- type Output = Result<FetchResult, curl::Error>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let self_ref = self.get_mut();
-
- match &self_ref.state {
- FetchState::Primed => {
- self_ref.state = FetchState::Running;
- let mut easy = self_ref.easy.take().unwrap();
- let waker = cx.waker().clone();
-
- let (tx, rx) = oneshot::channel::<Result<FetchResult, curl::Error>>();
- self_ref.response.replace(rx);
-
- task::spawn_blocking(move || {
- let mut out_data: Vec<u8> = Vec::new();
- let mut transfer = easy.transfer();
-
- transfer.write_function(|data| {
- out_data.extend_from_slice(data);
- Ok(data.len())
- }).expect("infallible curl operation failed");
-
- let res = transfer.perform();
- drop(transfer); // have to explicitly drop to release borrow on "easy"
-
- out_data.shrink_to_fit();
-
- tx.send(res.map(|_| FetchResult {
- response_code: easy.response_code().expect("querying response code should not fail"),
- data: out_data,
- easy
- })).expect("curl fetch reader hangup (this shouldn't happen)");
- waker.wake();
- });
-
- Poll::Pending
- },
- FetchState::Running => {
- self_ref.state = FetchState::Complete;
- Poll::Ready(self_ref.response.take().unwrap().try_recv()
- .expect("curl fetch writer hangup or not ready (this shouldn't happen)"))
- },
- FetchState::Complete => panic!("fetch polled after completion")
- }
- }
-} \ No newline at end of file
diff --git a/src/launcher/version.rs b/src/launcher/version.rs
index 0337864..f4cdd6c 100644
--- a/src/launcher/version.rs
+++ b/src/launcher/version.rs
@@ -6,7 +6,6 @@ use std::path::{Path, PathBuf};
use log::{debug, info, warn};
use sha1_smol::Digest;
-use super::request::EasyFetch;
use crate::util;
use crate::version::{*, manifest::*};
@@ -19,7 +18,7 @@ struct RemoteVersionList {
impl RemoteVersionList {
async fn new() -> Result<RemoteVersionList, Box<dyn Error>> {
- let text = EasyFetch::get(URL_VERSION_MANIFEST).await?.error_for_status()?.get_data_string();
+ let text = reqwest::get(URL_VERSION_MANIFEST).await?.error_for_status()?.text().await?;
let manifest: VersionManifest = serde_json::from_str(text.as_str())?;
let mut versions = HashMap::new();
@@ -46,7 +45,7 @@ impl RemoteVersionList {
}
// download it
- let ver_text = EasyFetch::get(ver.url.as_str()).await?.error_for_status()?.get_data_string();
+ let ver_text = reqwest::get(ver.url.as_str()).await?.error_for_status()?.text().await?;
// make sure it's valid
util::verify_sha1(ver.sha1, ver_text.as_str())