Add limiter for forge downloading

This commit is contained in:
Jai A
2023-04-04 21:17:19 -07:00
parent f66fc06b4f
commit 9754f2d1c5
4 changed files with 59 additions and 17 deletions

1
.env
View File

@@ -1,7 +1,6 @@
RUST_LOG=info,error RUST_LOG=info,error
BASE_URL=https://modrinth-cdn-staging.nyc3.digitaloceanspaces.com BASE_URL=https://modrinth-cdn-staging.nyc3.digitaloceanspaces.com
BASE_FOLDER=gamedata
S3_ACCESS_TOKEN=none S3_ACCESS_TOKEN=none
S3_SECRET=none S3_SECRET=none

View File

@@ -16,7 +16,7 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Read; use std::io::Read;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant; use std::time::{Duration, Instant};
use tokio::sync::{Mutex, Semaphore}; use tokio::sync::{Mutex, Semaphore};
lazy_static! { lazy_static! {
@@ -458,7 +458,22 @@ pub async fn retrieve_data(
}.await }.await
}); });
futures::future::try_join_all(loaders_futures).await?; {
let mut versions = loaders_futures.into_iter().peekable();
let mut chunk_index = 0;
while versions.peek().is_some() {
let now = Instant::now();
let chunk: Vec<_> = versions.by_ref().take(1).collect();
futures::future::try_join_all(chunk).await?;
chunk_index += 1;
let elapsed = now.elapsed();
info!("Chunk {} Elapsed: {:.2?}", chunk_index, elapsed);
}
}
//futures::future::try_join_all(loaders_futures).await?;
} }
versions.lock().await.push(daedalus::modded::Version { versions.lock().await.push(daedalus::modded::Version {
@@ -472,7 +487,22 @@ pub async fn retrieve_data(
} }
} }
futures::future::try_join_all(version_futures).await?; {
let mut versions = version_futures.into_iter().peekable();
let mut chunk_index = 0;
while versions.peek().is_some() {
let now = Instant::now();
let chunk: Vec<_> = versions.by_ref().take(10).collect();
futures::future::try_join_all(chunk).await?;
chunk_index += 1;
let elapsed = now.elapsed();
info!("Chunk {} Elapsed: {:.2?}", chunk_index, elapsed);
}
}
//futures::future::try_join_all(version_futures).await?;
if let Ok(versions) = Arc::try_unwrap(versions) { if let Ok(versions) = Arc::try_unwrap(versions) {
let mut versions = versions.into_inner(); let mut versions = versions.into_inner();

View File

@@ -1,4 +1,4 @@
use log::{error, warn}; use log::{error, info, warn};
use s3::creds::Credentials; use s3::creds::Credentials;
use s3::error::S3Error; use s3::error::S3Error;
use s3::{Bucket, Region}; use s3::{Bucket, Region};
@@ -112,7 +112,6 @@ fn check_env_vars() -> bool {
} }
failed |= check_var::<String>("BASE_URL"); failed |= check_var::<String>("BASE_URL");
failed |= check_var::<String>("BASE_FOLDER");
failed |= check_var::<String>("S3_ACCESS_TOKEN"); failed |= check_var::<String>("S3_ACCESS_TOKEN");
failed |= check_var::<String>("S3_SECRET"); failed |= check_var::<String>("S3_SECRET");
@@ -154,7 +153,8 @@ pub async fn upload_file_to_bucket(
semaphore: Arc<Semaphore>, semaphore: Arc<Semaphore>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let _permit = semaphore.acquire().await?; let _permit = semaphore.acquire().await?;
let key = format!("{}/{}", &*dotenvy::var("BASE_FOLDER").unwrap(), path); info!("{} started uploading", path);
let key = path.clone();
for attempt in 1..=4 { for attempt in 1..=4 {
let result = if let Some(ref content_type) = content_type { let result = if let Some(ref content_type) = content_type {
@@ -166,16 +166,13 @@ pub async fn upload_file_to_bucket(
} }
.map_err(|err| Error::S3Error { .map_err(|err| Error::S3Error {
inner: err, inner: err,
file: format!( file: path.clone(),
"{}/{}",
&*dotenvy::var("BASE_FOLDER").unwrap(),
path
),
}); });
match result { match result {
Ok(_) => { Ok(_) => {
{ {
info!("{} done uploading", path);
let mut uploaded_files = uploaded_files.lock().await; let mut uploaded_files = uploaded_files.lock().await;
uploaded_files.push(key); uploaded_files.push(key);
} }
@@ -188,15 +185,13 @@ pub async fn upload_file_to_bucket(
} }
} }
} }
unreachable!() unreachable!()
} }
pub fn format_url(path: &str) -> String { pub fn format_url(path: &str) -> String {
format!( format!(
"{}/{}/{}", "{}/{}",
&*dotenvy::var("BASE_URL").unwrap(), &*dotenvy::var("BASE_URL").unwrap(),
&*dotenvy::var("BASE_FOLDER").unwrap(),
path path
) )
} }
@@ -207,7 +202,9 @@ pub async fn download_file(
semaphore: Arc<Semaphore>, semaphore: Arc<Semaphore>,
) -> Result<bytes::Bytes, Error> { ) -> Result<bytes::Bytes, Error> {
let _permit = semaphore.acquire().await?; let _permit = semaphore.acquire().await?;
info!("{} started downloading", url);
let val = daedalus::download_file(url, sha1).await?; let val = daedalus::download_file(url, sha1).await?;
info!("{} finished downloading", url);
Ok(val) Ok(val)
} }
@@ -218,8 +215,9 @@ pub async fn download_file_mirrors(
semaphore: Arc<Semaphore>, semaphore: Arc<Semaphore>,
) -> Result<bytes::Bytes, Error> { ) -> Result<bytes::Bytes, Error> {
let _permit = semaphore.acquire().await?; let _permit = semaphore.acquire().await?;
info!("{} started downloading", base);
let val = daedalus::download_file_mirrors(base, mirrors, sha1).await?; let val = daedalus::download_file_mirrors(base, mirrors, sha1).await?;
info!("{} finished downloading", base);
Ok(val) Ok(val)
} }

View File

@@ -147,7 +147,22 @@ pub async fn retrieve_data(
}) })
} }
futures::future::try_join_all(version_futures).await?; {
let mut versions = version_futures.into_iter().peekable();
let mut chunk_index = 0;
while versions.peek().is_some() {
let now = Instant::now();
let chunk: Vec<_> = versions.by_ref().take(100).collect();
futures::future::try_join_all(chunk).await?;
chunk_index += 1;
let elapsed = now.elapsed();
info!("Chunk {} Elapsed: {:.2?}", chunk_index, elapsed);
}
}
//futures::future::try_join_all(version_futures).await?;
upload_file_to_bucket( upload_file_to_bucket(
format!( format!(