feat(app-backend): key circuit breaker by URI path pattern (#6383)

* feat(app-lib): split `FetchFence` on base URI path

* chore: fmt

* fix(app-lib): fix test timing
This commit is contained in:
François-Xavier Talbot
2026-06-16 14:49:06 -04:00
committed by GitHub
parent a3aeeac2c3
commit 3aaa2ef071
13 changed files with 155 additions and 24 deletions
+2
View File
@@ -78,6 +78,7 @@ pub async fn auto_install_java(java_version: u32) -> crate::Result<PathBuf> {
),
None,
None,
None,
&state.fetch_semaphore,
&state.pool,
).await?;
@@ -92,6 +93,7 @@ pub async fn auto_install_java(java_version: u32) -> crate::Result<PathBuf> {
None,
None,
Some((&loading_bar, 80.0)),
None,
&state.fetch_semaphore,
&state.pool,
)
@@ -82,6 +82,7 @@ pub async fn import_curseforge(
&thumbnail_url,
None,
None,
None,
&state.fetch_semaphore,
&state.pool,
)
@@ -326,6 +326,7 @@ pub async fn generate_pack_from_version_id(
None,
Some(&download_meta),
Some((&loading_bar, 70.0)),
None,
&state.fetch_semaphore,
&state.pool,
)
@@ -356,6 +357,7 @@ pub async fn generate_pack_from_version_id(
&icon_url,
None,
None,
None,
&state.fetch_semaphore,
&state.pool,
)
@@ -441,6 +441,7 @@ pub async fn install_zipped_mrpack_files(
.collect::<Vec<&str>>(),
project.hashes.get(&PackFileHash::Sha1).map(|x| &**x),
Some(&download_meta),
None,
&state.fetch_semaphore,
&state.pool,
)
@@ -113,6 +113,7 @@ pub async fn profile_create(
icon,
None,
None,
None,
&state.fetch_semaphore,
&state.pool,
)
+2 -2
View File
@@ -68,8 +68,8 @@ pub enum ErrorKind {
#[error("Error fetching URL: {0}")]
FetchError(#[from] reqwest::Error),
#[error("Too many API errors; temporarily blocked")]
ApiIsDownError,
#[error("Too many API errors, try again in {0} minutes")]
ApiIsDownError(u32),
#[error("{0}")]
LabrinthError(LabrinthError),
+18 -4
View File
@@ -88,6 +88,7 @@ pub async fn download_version_info(
&version.url,
None,
None,
None,
&st.api_semaphore,
&st.pool,
)
@@ -99,6 +100,7 @@ pub async fn download_version_info(
&loader.url,
None,
None,
None,
&st.api_semaphore,
&st.pool,
)
@@ -149,6 +151,7 @@ pub async fn download_client(
&client_download.url,
Some(&client_download.sha1),
None,
None,
&st.fetch_semaphore,
&st.pool,
)
@@ -189,6 +192,7 @@ pub async fn download_assets_index(
&version.asset_index.url,
None,
None,
None,
&st.fetch_semaphore,
&st.pool,
)
@@ -239,7 +243,7 @@ pub async fn download_assets(
async {
if !resource_path.exists() || force {
let resource = fetch_cell
.get_or_try_init(|| fetch(&url, Some(hash), None, &st.fetch_semaphore, &st.pool))
.get_or_try_init(|| fetch(&url, Some(hash), None, None, &st.fetch_semaphore, &st.pool))
.await?;
write(&resource_path, resource, &st.io_semaphore).await?;
tracing::trace!("Fetched asset with hash {hash}");
@@ -253,7 +257,7 @@ pub async fn download_assets(
if with_legacy && !resource_path.exists() || force {
let resource = fetch_cell
.get_or_try_init(|| fetch(&url, Some(hash), None, &st.fetch_semaphore, &st.pool))
.get_or_try_init(|| fetch(&url, Some(hash), None, None, &st.fetch_semaphore, &st.pool))
.await?;
write(&resource_path, resource, &st.io_semaphore).await?;
tracing::trace!("Fetched legacy asset with hash {hash}");
@@ -328,6 +332,7 @@ pub async fn download_libraries(
&native.url,
Some(&native.sha1),
None,
None,
&st.fetch_semaphore,
&st.pool,
)
@@ -373,6 +378,7 @@ pub async fn download_libraries(
&artifact.url,
Some(&artifact.sha1),
None,
None,
&st.fetch_semaphore,
&st.pool,
)
@@ -409,8 +415,15 @@ pub async fn download_libraries(
// failed download here is not a fatal condition.
//
// See DEV-479.
match fetch(&url, None, None, &st.fetch_semaphore, &st.pool)
.await
match fetch(
&url,
None,
None,
None,
&st.fetch_semaphore,
&st.pool,
)
.await
{
Ok(bytes) => {
write(&path, &bytes, &st.io_semaphore).await?;
@@ -470,6 +483,7 @@ pub async fn download_log_config(
&log_download.url,
Some(&log_download.sha1),
None,
None,
&st.fetch_semaphore,
&st.pool,
)
+24 -2
View File
@@ -1090,6 +1090,7 @@ impl CachedEntry {
method: Method,
api_url: &str,
url: &str,
uri_path: Option<&'static str>,
keys: &DashSet<impl Display + Eq + Hash + Serialize>,
fetch_semaphore: &FetchSemaphore,
pool: &SqlitePool,
@@ -1112,6 +1113,7 @@ impl CachedEntry {
url,
None,
None,
uri_path,
fetch_semaphore,
pool,
)
@@ -1122,11 +1124,12 @@ impl CachedEntry {
}
macro_rules! fetch_original_values {
($type:ident, $api_url:expr, $url_suffix:expr, $cache_variant:path) => {{
($type:ident, $api_url:expr, $url_suffix:expr, $uri_path:expr, $cache_variant:path) => {{
let mut results = fetch_many_batched(
Method::GET,
$api_url,
&format!("{}?ids=", $url_suffix),
$uri_path,
&keys,
&fetch_semaphore,
&pool,
@@ -1182,7 +1185,7 @@ impl CachedEntry {
}
macro_rules! fetch_original_value {
($type:ident, $api_url:expr, $url_suffix:expr, $cache_variant:path) => {{
($type:ident, $api_url:expr, $url_suffix:expr, $uri_path:expr, $cache_variant:path) => {{
vec![(
$cache_variant(
fetch_json(
@@ -1190,6 +1193,7 @@ impl CachedEntry {
&*format!("{}{}", $api_url, $url_suffix),
None,
None,
$uri_path,
&fetch_semaphore,
pool,
)
@@ -1207,6 +1211,7 @@ impl CachedEntry {
Project,
env!("MODRINTH_API_URL"),
"projects",
Some("/v2/projects"),
CacheValue::Project
)
}
@@ -1215,6 +1220,7 @@ impl CachedEntry {
ProjectV3,
env!("MODRINTH_API_URL_V3"),
"projects",
Some("/v3/projects"),
CacheValue::ProjectV3
)
}
@@ -1223,6 +1229,7 @@ impl CachedEntry {
Version,
env!("MODRINTH_API_URL"),
"versions",
Some("/v2/versions"),
CacheValue::Version
)
}
@@ -1231,6 +1238,7 @@ impl CachedEntry {
User,
env!("MODRINTH_API_URL"),
"users",
Some("/v2/users"),
CacheValue::User
)
}
@@ -1239,6 +1247,7 @@ impl CachedEntry {
Method::GET,
env!("MODRINTH_API_URL_V3"),
"teams?ids=",
Some("/v3/teams"),
&keys,
fetch_semaphore,
pool,
@@ -1278,6 +1287,7 @@ impl CachedEntry {
Method::GET,
env!("MODRINTH_API_URL_V3"),
"organizations?ids=",
Some("/v3/organizations"),
&keys,
fetch_semaphore,
pool,
@@ -1337,6 +1347,7 @@ impl CachedEntry {
"algorithm": "sha1",
"hashes": &keys,
})),
Some("/v2/version_files"),
fetch_semaphore,
pool,
)
@@ -1403,6 +1414,7 @@ impl CachedEntry {
url,
None,
None,
None,
fetch_semaphore,
pool,
)
@@ -1431,6 +1443,7 @@ impl CachedEntry {
"minecraft/v{}/manifest.json",
daedalus::minecraft::CURRENT_FORMAT_VERSION
),
None,
CacheValue::MinecraftManifest
)
}
@@ -1439,6 +1452,7 @@ impl CachedEntry {
Categories,
env!("MODRINTH_API_URL"),
"tag/category",
Some("/v2/tag/category"),
CacheValue::Categories
)
}
@@ -1447,6 +1461,7 @@ impl CachedEntry {
ReportTypes,
env!("MODRINTH_API_URL"),
"tag/report_type",
Some("/v2/tag/report_type"),
CacheValue::ReportTypes
)
}
@@ -1455,6 +1470,7 @@ impl CachedEntry {
Loaders,
env!("MODRINTH_API_URL"),
"tag/loader",
Some("/v2/tag/loader"),
CacheValue::Loaders
)
}
@@ -1463,6 +1479,7 @@ impl CachedEntry {
GameVersions,
env!("MODRINTH_API_URL"),
"tag/game_version",
Some("/v2/tag/game_version"),
CacheValue::GameVersions
)
}
@@ -1471,6 +1488,7 @@ impl CachedEntry {
DonationPlatforms,
env!("MODRINTH_API_URL"),
"tag/donation_platform",
Some("/v2/tag/donation_platform"),
CacheValue::DonationPlatforms
)
}
@@ -1629,6 +1647,7 @@ impl CachedEntry {
"game_versions": [game_version],
"version_types": version_types
})),
Some("/v2/version_files/update_many"),
fetch_semaphore,
pool,
)
@@ -1754,6 +1773,7 @@ impl CachedEntry {
url,
None,
None,
Some("/v2/search"),
fetch_semaphore,
pool,
)
@@ -1795,6 +1815,7 @@ impl CachedEntry {
&url,
None,
None,
Some("/v2/project/:id/version"),
fetch_semaphore,
pool,
)
@@ -1846,6 +1867,7 @@ impl CachedEntry {
url,
None,
None,
Some("/v3/search"),
fetch_semaphore,
pool,
)
+3
View File
@@ -331,6 +331,7 @@ impl FriendsSocket {
concat!(env!("MODRINTH_API_URL_V3"), "friends"),
None,
None,
Some("/v3/friends"),
semaphore,
exec,
)
@@ -359,6 +360,7 @@ impl FriendsSocket {
None,
None,
None,
Some("/v3/friend/:user_id"),
semaphore,
exec,
)
@@ -392,6 +394,7 @@ impl FriendsSocket {
None,
None,
None,
Some("/v3/friend/:user_id"),
semaphore,
exec,
)
@@ -895,6 +895,7 @@ async fn get_modpack_identifiers(
&[&primary_file.url],
primary_file.hashes.get("sha1").map(|s| s.as_str()),
Some(&download_meta),
None,
fetch_semaphore,
pool,
)
+2
View File
@@ -36,6 +36,7 @@ impl ModrinthCredentials {
Some(("Authorization", &*creds.session)),
None,
None,
Some("/v2/session/refresh"),
semaphore,
exec,
)
@@ -228,6 +229,7 @@ async fn fetch_info(
Some(("Authorization", token)),
None,
None,
Some("/v2/user"),
semaphore,
exec,
)
+1
View File
@@ -1588,6 +1588,7 @@ impl Profile {
&file.url,
file.hashes.get("sha1").map(|x| &**x),
Some(&download_meta),
None,
fetch_semaphore,
pool,
)
+97 -16
View File
@@ -10,7 +10,7 @@ use rand::Rng;
use reqwest::Method;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use std::sync::LazyLock;
@@ -50,20 +50,48 @@ pub struct IoSemaphore(pub Semaphore);
pub struct FetchSemaphore(pub Semaphore);
struct FetchFence {
inner: Mutex<FenceInner>,
inner: Mutex<HashMap<&'static str, FenceInner>>,
}
impl FetchFence {
pub fn is_blocked(&self) -> bool {
self.inner.lock().is_blocked()
pub fn is_blocked(&self, key: &'static str) -> bool {
self.inner
.lock()
.entry(key)
.or_insert_with(FenceInner::new)
.is_blocked()
}
pub fn record_ok(&self) {
self.inner.lock().record_ok()
pub fn record_ok(&self, key: &'static str) {
self.inner
.lock()
.entry(key)
.or_insert_with(FenceInner::new)
.record_ok()
}
pub fn record_fail(&self) {
self.inner.lock().record_fail()
pub fn record_fail(&self, key: &'static str) {
self.inner
.lock()
.entry(key)
.or_insert_with(FenceInner::new)
.record_fail()
}
pub fn latest_block_minutes(&self) -> u32 {
let now = Utc::now();
self.inner
.lock()
.values()
.filter_map(|fence| fence.block_until)
.filter(|until| *until > now)
.max()
.map(|until| {
let seconds = until.signed_duration_since(now).num_seconds();
(seconds.max(0) as u32).div_ceil(60).max(1)
})
.unwrap_or(1)
}
}
@@ -154,7 +182,7 @@ impl FenceInner {
static GLOBAL_FETCH_FENCE: LazyLock<FetchFence> =
LazyLock::new(|| FetchFence {
inner: Mutex::new(FenceInner::new()),
inner: Mutex::new(HashMap::new()),
});
fn reqwest_client_builder() -> reqwest::ClientBuilder {
@@ -184,6 +212,7 @@ pub async fn fetch(
url: &str,
sha1: Option<&str>,
download_meta: Option<&DownloadMeta>,
uri_path: Option<&'static str>,
semaphore: &FetchSemaphore,
exec: impl sqlx::Executor<'_, Database = sqlx::Sqlite>,
) -> crate::Result<Bytes> {
@@ -195,6 +224,7 @@ pub async fn fetch(
None,
download_meta,
None,
uri_path,
semaphore,
exec,
)
@@ -206,6 +236,7 @@ pub async fn fetch_with_client(
url: &str,
sha1: Option<&str>,
download_meta: Option<&DownloadMeta>,
uri_path: Option<&'static str>,
semaphore: &FetchSemaphore,
exec: impl sqlx::Executor<'_, Database = sqlx::Sqlite>,
client: &reqwest::Client,
@@ -218,6 +249,7 @@ pub async fn fetch_with_client(
None,
download_meta,
None,
uri_path,
semaphore,
exec,
client,
@@ -231,6 +263,7 @@ pub async fn fetch_json<T>(
url: &str,
sha1: Option<&str>,
json_body: Option<serde_json::Value>,
uri_path: Option<&'static str>,
semaphore: &FetchSemaphore,
exec: impl sqlx::Executor<'_, Database = sqlx::Sqlite>,
) -> crate::Result<T>
@@ -238,7 +271,8 @@ where
T: DeserializeOwned,
{
let result = fetch_advanced(
method, url, sha1, json_body, None, None, None, semaphore, exec,
method, url, sha1, json_body, None, None, None, uri_path, semaphore,
exec,
)
.await?;
let value = serde_json::from_slice(&result)?;
@@ -257,6 +291,7 @@ pub async fn fetch_advanced(
header: Option<(&str, &str)>,
download_meta: Option<&DownloadMeta>,
loading_bar: Option<(&LoadingBarId, f64)>,
uri_path: Option<&'static str>,
semaphore: &FetchSemaphore,
exec: impl sqlx::Executor<'_, Database = sqlx::Sqlite>,
) -> crate::Result<Bytes> {
@@ -268,6 +303,7 @@ pub async fn fetch_advanced(
header,
download_meta,
loading_bar,
uri_path,
semaphore,
exec,
&INSECURE_REQWEST_CLIENT,
@@ -286,6 +322,7 @@ pub async fn fetch_advanced_with_client(
header: Option<(&str, &str)>,
download_meta: Option<&DownloadMeta>,
loading_bar: Option<(&LoadingBarId, f64)>,
uri_path: Option<&'static str>,
semaphore: &FetchSemaphore,
exec: impl sqlx::Executor<'_, Database = sqlx::Sqlite>,
client: &reqwest::Client,
@@ -294,6 +331,7 @@ pub async fn fetch_advanced_with_client(
let is_api_url = url.starts_with(env!("MODRINTH_API_URL"))
|| url.starts_with(env!("MODRINTH_API_URL_V3"));
let fence_key = if is_api_url { uri_path } else { None };
let creds = if header
.as_ref()
@@ -309,8 +347,13 @@ pub async fn fetch_advanced_with_client(
.map(|m| (DOWNLOAD_META_HEADER.to_string(), m.to_header_value()));
for attempt in 1..=(FETCH_ATTEMPTS + 1) {
if is_api_url && GLOBAL_FETCH_FENCE.is_blocked() {
return Err(ErrorKind::ApiIsDownError.into());
if let Some(fence_key) = fence_key
&& GLOBAL_FETCH_FENCE.is_blocked(fence_key)
{
return Err(ErrorKind::ApiIsDownError(
GLOBAL_FETCH_FENCE.latest_block_minutes(),
)
.into());
}
let mut req = client.request(method.clone(), url);
@@ -336,8 +379,8 @@ pub async fn fetch_advanced_with_client(
match result {
Ok(resp) => {
if resp.status().is_server_error() {
if is_api_url {
GLOBAL_FETCH_FENCE.record_fail();
if let Some(fence_key) = fence_key {
GLOBAL_FETCH_FENCE.record_fail(fence_key);
}
if attempt <= FETCH_ATTEMPTS {
@@ -400,8 +443,8 @@ pub async fn fetch_advanced_with_client(
tracing::trace!("Done downloading URL {url}");
if is_api_url {
GLOBAL_FETCH_FENCE.record_ok();
if let Some(fence_key) = fence_key {
GLOBAL_FETCH_FENCE.record_ok(fence_key);
}
return Ok(bytes);
@@ -427,6 +470,7 @@ pub async fn fetch_mirrors(
mirrors: &[&str],
sha1: Option<&str>,
download_meta: Option<&DownloadMeta>,
uri_path: Option<&'static str>,
semaphore: &FetchSemaphore,
exec: impl sqlx::Executor<'_, Database = sqlx::Sqlite> + Copy,
) -> crate::Result<Bytes> {
@@ -441,6 +485,7 @@ pub async fn fetch_mirrors(
mirror,
sha1,
download_meta,
uri_path,
semaphore,
exec,
&REQWEST_CLIENT,
@@ -620,6 +665,42 @@ mod tests {
assert!(fence.is_blocked());
}
#[test]
fn test_fetch_fence_keys_are_independent() {
let fence = FetchFence {
inner: Mutex::new(HashMap::new()),
};
for _ in 0..FenceInner::FAILURE_THRESHOLD {
fence.record_fail("/v3/version_file/:sha1/update");
}
assert!(fence.is_blocked("/v3/version_file/:sha1/update"));
assert!(!fence.is_blocked("/v3/project/:id"));
}
#[test]
fn test_fetch_fence_latest_block_minutes() {
let fence = FetchFence {
inner: Mutex::new(HashMap::new()),
};
{
let mut inner = fence.inner.lock();
inner.insert("/expired", FenceInner::new());
inner.get_mut("/expired").unwrap().block_until =
Some(Utc::now() - TimeDelta::minutes(1));
inner.insert("/short", FenceInner::new());
inner.get_mut("/short").unwrap().block_until =
Some(Utc::now() + TimeDelta::seconds(61));
inner.insert("/long", FenceInner::new());
inner.get_mut("/long").unwrap().block_until =
Some(Utc::now() + TimeDelta::seconds(140));
}
assert_eq!(fence.latest_block_minutes(), 3);
}
#[test]
fn test_fence_block_after_4_fails_with_oks() {
// Update tests if the FenceInner constants change