use std::cmp::min; use std::collections::VecDeque; use std::error::Error; use std::future::Future; use std::pin::{pin, Pin}; use std::task::{Context, Poll}; use curl::easy::{Easy2, Handler, WriteError}; use curl::multi::{Easy2Handle, EasyHandle, Multi}; use tokio::task::spawn_blocking; use crate::launcher::constants::USER_AGENT; trait Download { async fn prepare(&mut self, easy: &EasyHandle) -> Result>; async fn handle_chunk(&mut self, data: &[u8]) -> Result<(), Box>; async fn finish(&mut self) -> Result<(), Box>; } #[derive(Clone, Copy)] enum MultiDownloaderState { Primed, Running, Complete } struct MultiDownloader where T: Download + Send + Unpin + 'static { state: MultiDownloaderState, nhandles: usize, jobs: Option> } impl MultiDownloader where T: Download + Send + Unpin + 'static { // TODO: this interface is kind of weird (we have to take ownership of the jobs, but maybe this isn't the best way to do that) pub fn new(jobs: Vec, nhandles: usize) -> MultiDownloader { assert!(nhandles > 0); MultiDownloader { state: MultiDownloaderState::Primed, nhandles, jobs: Some(jobs.into()) } } } struct MultiDownloadResult { } impl Future for MultiDownloader where T: Download + Send + Unpin + 'static { type Output = MultiDownloadResult; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let self_mut = self.get_mut(); match self_mut.state { MultiDownloaderState::Primed => { self_mut.state = MultiDownloaderState::Running; let jobs = self_mut.jobs.take().unwrap(); let nhandles = self_mut.nhandles; spawn_blocking(move || { // TODO MultiDownloadBlocking::new(jobs, nhandles).perform(); }); Poll::Pending }, MultiDownloaderState::Running => { Poll::Pending // TODO }, MultiDownloaderState::Complete => panic!("multi download polled after completion") } } } struct MultiHandler { job: Option } impl Handler for MultiHandler { fn write(&mut self, data: &[u8]) -> Result { todo!() } } struct MultiDownloadBlocking { multi: Multi, easy_handles: Vec>>, jobs: VecDeque } impl<'a, T: Download> MultiDownloadBlocking { fn new(jobs: VecDeque, nhandles: usize) -> MultiDownloadBlocking { assert!(nhandles > 0); let nhandles = min(nhandles, jobs.len()); let multi = Multi::new(); let mut easy_handles = Vec::with_capacity(nhandles); for n in 0..nhandles { let mut easy = Easy2::new(MultiHandler { job: None }); easy.useragent(USER_AGENT).expect("setting user agent shouldn't fail"); let mut handle = multi.add2(easy).expect("adding easy handle cannot fail"); handle.set_token(n).expect("setting token cannot fail"); easy_handles.push(handle); } MultiDownloadBlocking { multi, easy_handles, jobs } } fn prepare_job(&mut self, easy: &mut Easy2Handle>, job: T) { let handler = easy.get_mut(); todo!() } fn perform(&mut self) -> MultiDownloadResult { todo!() } } // // pub struct MultiDownloader<'j> { // state: MultiDownloaderState, // jobs: VecDeque, // multi: Multi, // handles: Vec>> // } // // pub struct EasyHandler<'j> { // job: Option<&'j DownloadJob> // } // // impl<'j> Handler for EasyHandler<'j> { // fn write(&mut self, data: &[u8]) -> Result { // // } // } // // impl<'j, 'js, T> MultiDownloader<'j, 'js, T> // where // T: Download + Sync + Send + ?Sized // { // pub fn new(jobs: &'js [&'j T]) -> MultiDownloader<'j, 'js, T> { // Self::with_handles(jobs, 8) // } // // pub fn with_handles(jobs: &'js [&'j T], nhandles: usize) -> MultiDownloader<'j, 'js, T> { // assert!(nhandles > 0); // // let mut handles = Vec::with_capacity(nhandles); // let multi = Multi::new(); // // for n in 0..nhandles { // let mut easy = multi.add2(Easy2::new(EasyHandler { job: None })).expect("adding easy handle should not fail"); // easy.set_token(n).expect("setting token should not fail"); // // handles.push(easy); // } // // MultiDownloader { // state: MultiDownloaderState::Primed, // // multi, // handles // } // } // // fn next_job(&mut self) -> Option<&T> { // let oj = self.jobs.get(self.job_idx).cloned(); // self.job_idx += 1; // oj // } // } // // pub struct MultiDownloadResult { // // } // // impl<'j, 'js, T> Future for MultiDownloader<'j, 'js, T> // where // T: Download + Sync + Send + ?Sized // { // type Output = MultiDownloadResult; // // fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // let self_mut = self.get_mut(); // match self_mut.state { // MultiDownloaderState::Primed => { // self_mut.state = MultiDownloaderState::Running; // // // TODO: assign download jobs to each // // Poll::Pending // }, // MultiDownloaderState::Running => { // todo!() // }, // MultiDownloaderState::Complete => panic!("multi downloader polled after completion") // } // } // } // fn sample() { // struct AbcdDownloader { // file: Option, // path: PathBuf, // url: String, // expect_size: usize, // expect_sha1: Digest // } // // impl Download for AbcdDownloader { // async fn before_download(&mut self) -> Result> { // let mut file = match File::open(&self.path).await { // Ok(f) => f, // Err(e) => return match e.kind() { // ErrorKind::NotFound => { // // TODO: ensure parent dirs exist // // Ok(true) // }, // _ => Err(e.into()) // } // }; // // let mut buf = [0u8; 4096]; // let mut sha1 = Sha1::new(); // let mut tally = 0usize; // // loop { // let n = file.read(&mut buf[..]).await?; // if n == 0 { // file.seek(SeekFrom::Start(0)).await?; // break; // } // // sha1.update(&buf[..n]); // tally += n; // } // // Ok(tally != self.expect_size || sha1.digest() != self.expect_sha1) // } // // async fn handle_chunk(&mut self, data: &[u8]) -> Result { // self.file.unwrap().write_all(data).await // } // // async fn after_download(&mut self) -> Result<(), Box> { // todo!() // } // } // // let mut jobs = Vec::new(); // jobs.push(AbcdDownloader { // file: None, // path: PathBuf::from("/home/bigfoot/test.words"), // url: String::from("https://github.com/"), // expect_size: 10, // expect_sha1: Digest::default() // }); // // let abcd = MultiDownloader::new(jobs.iter().map(|a| a).collect::>().as_slice()); // }