Fix issues

This commit is contained in:
Jai A
2023-04-04 20:25:17 -07:00
parent b7e2d7fb8e
commit 79ceb56c60
9 changed files with 740 additions and 624 deletions

View File

@@ -1,21 +1,26 @@
use crate::download_file;
use crate::{format_url, upload_file_to_bucket, Error};
use daedalus::download_file;
use daedalus::minecraft::VersionManifest;
use log::info;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use std::time::Instant;
use tokio::sync::{Mutex, Semaphore};
pub async fn retrieve_data(uploaded_files: &mut Vec<String>) -> Result<VersionManifest, Error> {
let old_manifest =
daedalus::minecraft::fetch_version_manifest(Some(&*crate::format_url(&format!(
pub async fn retrieve_data(
uploaded_files: &mut Vec<String>,
semaphore: Arc<Semaphore>,
) -> Result<VersionManifest, Error> {
let old_manifest = daedalus::minecraft::fetch_version_manifest(Some(
&*format_url(&format!(
"minecraft/v{}/manifest.json",
daedalus::minecraft::CURRENT_FORMAT_VERSION
))))
.await
.ok();
)),
))
.await
.ok();
let mut manifest = daedalus::minecraft::fetch_version_manifest(None).await?;
let mut manifest =
daedalus::minecraft::fetch_version_manifest(None).await?;
let cloned_manifest = Arc::new(Mutex::new(manifest.clone()));
let visited_assets_mutex = Arc::new(Mutex::new(Vec::new()));
@@ -42,13 +47,16 @@ pub async fn retrieve_data(uploaded_files: &mut Vec<String>) -> Result<VersionMa
let visited_assets_mutex = Arc::clone(&visited_assets_mutex);
let cloned_manifest_mutex = Arc::clone(&cloned_manifest);
let uploaded_files_mutex = Arc::clone(&uploaded_files_mutex);
let semaphore = Arc::clone(&semaphore);
let assets_hash = old_version.and_then(|x| x.assets_index_sha1.clone());
let assets_hash =
old_version.and_then(|x| x.assets_index_sha1.clone());
async move {
let mut upload_futures = Vec::new();
let version_info = daedalus::minecraft::fetch_version_info(version).await?;
let version_info =
daedalus::minecraft::fetch_version_info(version).await?;
let version_path = format!(
"minecraft/v{}/versions/{}.json",
@@ -63,14 +71,16 @@ pub async fn retrieve_data(uploaded_files: &mut Vec<String>) -> Result<VersionMa
let assets_index_url = version_info.asset_index.url.clone();
{
let mut cloned_manifest = cloned_manifest_mutex.lock().await;
let mut cloned_manifest =
cloned_manifest_mutex.lock().await;
let position = cloned_manifest
.versions
.iter()
.position(|x| version.id == x.id)
.unwrap();
cloned_manifest.versions[position].url = format_url(&version_path);
cloned_manifest.versions[position].url =
format_url(&version_path);
cloned_manifest.versions[position].assets_index_sha1 =
Some(version_info.asset_index.sha1.clone());
cloned_manifest.versions[position].assets_index_url =
@@ -93,14 +103,18 @@ pub async fn retrieve_data(uploaded_files: &mut Vec<String>) -> Result<VersionMa
}
if download_assets {
visited_assets.push(version_info.asset_index.id.clone());
visited_assets
.push(version_info.asset_index.id.clone());
}
}
if download_assets {
let assets_index =
download_file(&assets_index_url, Some(&version_info.asset_index.sha1))
.await?;
let assets_index = download_file(
&assets_index_url,
Some(&version_info.asset_index.sha1),
semaphore.clone(),
)
.await?;
{
upload_futures.push(upload_file_to_bucket(
@@ -108,6 +122,7 @@ pub async fn retrieve_data(uploaded_files: &mut Vec<String>) -> Result<VersionMa
assets_index.to_vec(),
Some("application/json".to_string()),
uploaded_files_mutex.as_ref(),
semaphore.clone(),
));
}
}
@@ -118,6 +133,7 @@ pub async fn retrieve_data(uploaded_files: &mut Vec<String>) -> Result<VersionMa
serde_json::to_vec(&version_info)?,
Some("application/json".to_string()),
uploaded_files_mutex.as_ref(),
semaphore.clone(),
));
}
@@ -131,23 +147,7 @@ pub async fn retrieve_data(uploaded_files: &mut Vec<String>) -> Result<VersionMa
})
}
{
let mut versions = version_futures.into_iter().peekable();
let mut chunk_index = 0;
while versions.peek().is_some() {
let now = Instant::now();
let chunk: Vec<_> = versions.by_ref().take(100).collect();
futures::future::try_join_all(chunk).await?;
tokio::time::sleep(Duration::from_secs(1)).await;
chunk_index += 1;
let elapsed = now.elapsed();
info!("Chunk {} Elapsed: {:.2?}", chunk_index, elapsed);
}
}
futures::future::try_join_all(version_futures).await?;
upload_file_to_bucket(
format!(
@@ -157,6 +157,7 @@ pub async fn retrieve_data(uploaded_files: &mut Vec<String>) -> Result<VersionMa
serde_json::to_vec(&*cloned_manifest.lock().await)?,
Some("application/json".to_string()),
uploaded_files_mutex.as_ref(),
semaphore,
)
.await?;