1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
use std::error::Error;
use std::fmt::{Debug, Display, Formatter};
use futures::{stream, Stream, StreamExt};
use reqwest::{Client, IntoUrl, Method, RequestBuilder};
use crate::launcher::constants::USER_AGENT;
pub trait Download: Debug + Display {
type URLType: IntoUrl;
// return Ok(None) to skip downloading this file
fn get_url(&self) -> Self::URLType;
async fn prepare(&mut self, req: RequestBuilder) -> Result<Option<RequestBuilder>, Box<dyn Error>>;
async fn handle_chunk(&mut self, chunk: &[u8]) -> Result<(), Box<dyn Error>>;
async fn finish(&mut self) -> Result<(), Box<dyn Error>>;
}
pub struct MultiDownloader<T: Download> {
jobs: Vec<T>,
client: Client,
nconcurrent: usize
}
#[derive(Debug, Clone, Copy)]
pub enum Phase {
Prepare,
Send,
Receive,
HandleChunk,
Finish
}
impl Display for Phase {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
/* an error occurred while (present participle) ... */
Self::Prepare => f.write_str("preparing the request"),
Self::Send => f.write_str("sending the request"),
Self::Receive => f.write_str("receiving response data"),
Self::HandleChunk => f.write_str("handling response data"),
Self::Finish => f.write_str("finishing the request"),
}
}
}
pub struct PhaseDownloadError<'j, T: Download> {
phase: Phase,
inner: Box<dyn Error>,
job: &'j T
}
impl<'j, T: Download> Debug for PhaseDownloadError<'j, T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PhaseDownloadError")
.field("phase", &self.phase)
.field("inner", &self.inner)
.field("job", &self.job)
.finish()
}
}
impl<'j, T: Download> Display for PhaseDownloadError<'j, T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "error while {} ({}): {}", self.phase, self.job, self.inner)
}
}
impl<'j, T: Download> Error for PhaseDownloadError<'j, T> {
fn source(&self) -> Option<&(dyn Error + 'static)> {
Some(&*self.inner)
}
}
impl<'j, T: Download> PhaseDownloadError<'j, T> {
fn new(phase: Phase, inner: Box<dyn Error>, job: &'j T) -> Self {
PhaseDownloadError {
phase, inner, job
}
}
}
impl<T: Download> MultiDownloader<T> {
pub fn new(jobs: impl IntoIterator<Item = T>) -> MultiDownloader<T> {
Self::with_concurrent(jobs, 8)
}
pub fn with_concurrent(jobs: impl IntoIterator<Item = T>, n: usize) -> MultiDownloader<T> {
assert!(n > 0);
MultiDownloader {
jobs: jobs.into_iter().collect(),
client: Client::new(),
nconcurrent: n
}
}
pub async fn perform(&mut self) -> impl Stream<Item = Result<(), PhaseDownloadError<T>>> {
stream::iter(self.jobs.iter_mut()).map(|job| {
let client = &self.client;
async move {
let rq = match job.prepare(client.request(Method::GET, job.get_url()).header(reqwest::header::USER_AGENT, USER_AGENT)).await {
Ok(opt) => match opt {
Some(rq) => rq,
None => return Ok(())
},
Err(e) => return Err(PhaseDownloadError::new(Phase::Prepare, e, job))
};
let mut data = match rq.send().await {
Ok(data) => match data.error_for_status() {
Ok(data) => data.bytes_stream(),
Err(e) => return Err(PhaseDownloadError::new(Phase::Send, e.into(), job))
},
Err(e) => return Err(PhaseDownloadError::new(Phase::Send, e.into(), job))
};
while let Some(bytes) = data.next().await {
let bytes = match bytes {
Ok(bytes) => bytes,
Err(e) => return Err(PhaseDownloadError::new(Phase::Receive, e.into(), job))
};
match job.handle_chunk(bytes.as_ref()).await {
Ok(_) => (),
Err(e) => return Err(PhaseDownloadError::new(Phase::HandleChunk, e.into(), job))
}
}
job.finish().await.map_err(|e| PhaseDownloadError::new(Phase::Finish, e.into(), job))?;
Ok(())
}
}).buffer_unordered(self.nconcurrent)
}
}
|