You've already forked AstralRinth
forked from didirus/AstralRinth
Modpack support (#60)
* Modpack support * Finish feature * Tauri errors fix (#61) * async impl * working * fmt and redundancy * moved ? to if let Ok block * Finish modpacks support * remove generated file * fix compile err * fix lint * Fix code review comments + forge support --------- Co-authored-by: Wyatt Verchere <wverchere@gmail.com>
This commit is contained in:
@@ -1,76 +1,85 @@
|
||||
//! Functions for fetching infromation from the Internet
|
||||
use crate::config::REQWEST_CLIENT;
|
||||
use futures::prelude::*;
|
||||
use std::{collections::LinkedList, convert::TryInto, path::Path, sync::Arc};
|
||||
use bytes::Bytes;
|
||||
use std::ffi::OsStr;
|
||||
use std::path::{Path, PathBuf};
|
||||
use tokio::{
|
||||
fs::{self, File},
|
||||
io::AsyncWriteExt,
|
||||
sync::{Semaphore, SemaphorePermit},
|
||||
sync::SemaphorePermit,
|
||||
};
|
||||
|
||||
const FETCH_ATTEMPTS: usize = 3;
|
||||
|
||||
/// Downloads a file with retry and checksum functionality
|
||||
#[tracing::instrument(skip(_permit))]
|
||||
pub async fn fetch<'a>(
|
||||
url: &str,
|
||||
sha1: Option<&str>,
|
||||
_permit: &SemaphorePermit<'a>,
|
||||
) -> crate::Result<bytes::Bytes> {
|
||||
let mut attempts = LinkedList::new();
|
||||
for _ in 0..FETCH_ATTEMPTS {
|
||||
attempts.push_back(
|
||||
async {
|
||||
let content = REQWEST_CLIENT.get(url).send().await?;
|
||||
let bytes = content.bytes().await?;
|
||||
for attempt in 1..=(FETCH_ATTEMPTS + 1) {
|
||||
let result = REQWEST_CLIENT.get(url).send().await;
|
||||
|
||||
if let Some(hash) = sha1 {
|
||||
let actual_hash = sha1_async(bytes.clone()).await;
|
||||
if actual_hash != hash {
|
||||
return Err(crate::ErrorKind::HashError(
|
||||
actual_hash,
|
||||
String::from(hash),
|
||||
)
|
||||
.into());
|
||||
match result {
|
||||
Ok(x) => {
|
||||
let bytes = x.bytes().await;
|
||||
|
||||
if let Ok(bytes) = bytes {
|
||||
if let Some(sha1) = sha1 {
|
||||
let hash = sha1_async(bytes.clone()).await?;
|
||||
if &*hash != sha1 {
|
||||
if attempt <= 3 {
|
||||
continue;
|
||||
} else {
|
||||
return Err(crate::ErrorKind::HashError(
|
||||
sha1.to_string(),
|
||||
hash,
|
||||
)
|
||||
.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(bytes)
|
||||
log::debug!("Done downloading URL {url}");
|
||||
return Ok(bytes);
|
||||
} else if attempt <= 3 {
|
||||
continue;
|
||||
} else if let Err(err) = bytes {
|
||||
return Err(err.into());
|
||||
}
|
||||
}
|
||||
.boxed(),
|
||||
)
|
||||
Err(_) if attempt <= 3 => continue,
|
||||
Err(err) => return Err(err.into()),
|
||||
}
|
||||
}
|
||||
|
||||
log::debug!("Done downloading URL {url}");
|
||||
future::select_ok(attempts).map_ok(|it| it.0).await
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
// This is implemented, as it will be useful in porting modpacks
|
||||
// For now, allow it to be dead code
|
||||
#[allow(dead_code)]
|
||||
#[tracing::instrument(skip(sem))]
|
||||
pub async fn fetch_mirrors(
|
||||
urls: &[&str],
|
||||
/// Downloads a file from specified mirrors
|
||||
#[tracing::instrument(skip(permit))]
|
||||
pub async fn fetch_mirrors<'a>(
|
||||
mirrors: &[&str],
|
||||
sha1: Option<&str>,
|
||||
permits: u32,
|
||||
sem: &Semaphore,
|
||||
permit: &SemaphorePermit<'a>,
|
||||
) -> crate::Result<bytes::Bytes> {
|
||||
let _permits = sem.acquire_many(permits).await.unwrap();
|
||||
let sem = Arc::new(Semaphore::new(permits.try_into().unwrap()));
|
||||
if mirrors.is_empty() {
|
||||
return Err(crate::ErrorKind::InputError(
|
||||
"No mirrors provided!".to_string(),
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
future::select_ok(urls.iter().map(|url| {
|
||||
let sha1 = sha1.map(String::from);
|
||||
let url = String::from(*url);
|
||||
let sem = Arc::clone(&sem);
|
||||
for (index, mirror) in mirrors.iter().enumerate() {
|
||||
let result = fetch(mirror, sha1, permit).await;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let permit = sem.acquire().await.unwrap();
|
||||
fetch(&url, sha1.as_deref(), &permit).await
|
||||
})
|
||||
.map(Result::unwrap)
|
||||
.boxed()
|
||||
}))
|
||||
.await
|
||||
.map(|it| it.0)
|
||||
if result.is_ok() || (result.is_err() && index == (mirrors.len() - 1)) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(bytes, _permit))]
|
||||
@@ -89,8 +98,31 @@ pub async fn write<'a>(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn sha1_async(bytes: bytes::Bytes) -> String {
|
||||
tokio::task::spawn_blocking(move || sha1::Sha1::from(bytes).hexdigest())
|
||||
.await
|
||||
.unwrap()
|
||||
#[tracing::instrument(skip(bytes, permit))]
|
||||
pub async fn write_cached_icon<'a>(
|
||||
icon_path: &str,
|
||||
cache_dir: &Path,
|
||||
bytes: Bytes,
|
||||
permit: &SemaphorePermit<'a>,
|
||||
) -> crate::Result<PathBuf> {
|
||||
let extension = Path::new(&icon_path).extension().and_then(OsStr::to_str);
|
||||
let hash = sha1_async(bytes.clone()).await?;
|
||||
let path = cache_dir.join("icons").join(if let Some(ext) = extension {
|
||||
format!("{hash}.{ext}")
|
||||
} else {
|
||||
hash
|
||||
});
|
||||
|
||||
write(&path, &bytes, permit).await?;
|
||||
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
async fn sha1_async(bytes: Bytes) -> crate::Result<String> {
|
||||
let hash = tokio::task::spawn_blocking(move || {
|
||||
sha1::Sha1::from(bytes).hexdigest()
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(hash)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user