From 3fc55184a7e0c42aa3cd35542dfdaf40f8dd212c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois-Xavier=20Talbot?= <108630700+fetchfern@users.noreply.github.com> Date: Sun, 14 Sep 2025 11:44:52 -0400 Subject: [PATCH] Support alternative read-replica PgPool (#4374) * Add ReadOnlyPgPool * Clippy, fmt --- apps/labrinth/src/database/mod.rs | 6 +-- .../src/database/postgres_database.rs | 51 ++++++++++++++++++- apps/labrinth/src/lib.rs | 5 ++ apps/labrinth/src/main.rs | 3 +- apps/labrinth/src/routes/v2/version_file.rs | 7 +-- apps/labrinth/src/routes/v3/version_file.rs | 28 +++++----- apps/labrinth/tests/common/database.rs | 9 +++- apps/labrinth/tests/common/mod.rs | 2 + 8 files changed, 88 insertions(+), 23 deletions(-) diff --git a/apps/labrinth/src/database/mod.rs b/apps/labrinth/src/database/mod.rs index cd84e4e2c..b59954855 100644 --- a/apps/labrinth/src/database/mod.rs +++ b/apps/labrinth/src/database/mod.rs @@ -4,6 +4,6 @@ pub mod redis; pub use models::DBImage; pub use models::DBProject; pub use models::DBVersion; -pub use postgres_database::check_for_migrations; -pub use postgres_database::connect; -pub use postgres_database::register_and_set_metrics; +pub use postgres_database::{ + ReadOnlyPgPool, check_for_migrations, connect_all, register_and_set_metrics, +}; diff --git a/apps/labrinth/src/database/postgres_database.rs b/apps/labrinth/src/database/postgres_database.rs index 3ace44dfe..f0cf7b5f3 100644 --- a/apps/labrinth/src/database/postgres_database.rs +++ b/apps/labrinth/src/database/postgres_database.rs @@ -2,10 +2,35 @@ use prometheus::{IntGauge, Registry}; use sqlx::migrate::MigrateDatabase; use sqlx::postgres::{PgPool, PgPoolOptions}; use sqlx::{Connection, PgConnection, Postgres}; +use std::ops::{Deref, DerefMut}; use std::time::Duration; use tracing::info; -pub async fn connect() -> Result { +#[derive(Clone)] +#[repr(transparent)] +pub struct ReadOnlyPgPool(PgPool); + +impl From for ReadOnlyPgPool { + fn from(pool: PgPool) -> Self { + ReadOnlyPgPool(pool) + } +} + +impl Deref for ReadOnlyPgPool { + type Target = PgPool; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for ReadOnlyPgPool { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +pub async fn connect_all() -> Result<(PgPool, ReadOnlyPgPool), sqlx::Error> { info!("Initializing database connection"); let database_url = dotenvy::var("DATABASE_URL").expect("`DATABASE_URL` not in .env"); @@ -26,7 +51,29 @@ pub async fn connect() -> Result { .connect(&database_url) .await?; - Ok(pool) + if let Ok(url) = dotenvy::var("READONLY_DATABASE_URL") { + let ro_pool = PgPoolOptions::new() + .min_connections( + dotenvy::var("READONLY_DATABASE_MIN_CONNECTIONS") + .ok() + .and_then(|x| x.parse().ok()) + .unwrap_or(0), + ) + .max_connections( + dotenvy::var("READONLY_DATABASE_MAX_CONNECTIONS") + .ok() + .and_then(|x| x.parse().ok()) + .unwrap_or(1), + ) + .max_lifetime(Some(Duration::from_secs(60 * 60))) + .connect(&url) + .await?; + + Ok((pool, ReadOnlyPgPool(ro_pool))) + } else { + let ro = ReadOnlyPgPool(pool.clone()); + Ok((pool, ro)) + } } pub async fn check_for_migrations() -> Result<(), sqlx::Error> { let uri = dotenvy::var("DATABASE_URL").expect("`DATABASE_URL` not in .env"); diff --git a/apps/labrinth/src/lib.rs b/apps/labrinth/src/lib.rs index ded9b0073..9bcad3c60 100644 --- a/apps/labrinth/src/lib.rs +++ b/apps/labrinth/src/lib.rs @@ -15,6 +15,7 @@ use clickhouse_crate::Client; use util::cors::default_cors; use crate::background_task::update_versions; +use crate::database::ReadOnlyPgPool; use crate::queue::moderation::AutomatedModerationQueue; use crate::util::env::{parse_strings_from_var, parse_var}; use crate::util::ratelimit::{AsyncRateLimiter, GCRAParameters}; @@ -42,6 +43,7 @@ pub struct Pepper { #[derive(Clone)] pub struct LabrinthConfig { pub pool: sqlx::Pool, + pub ro_pool: ReadOnlyPgPool, pub redis_pool: RedisPool, pub clickhouse: Client, pub file_host: Arc, @@ -61,6 +63,7 @@ pub struct LabrinthConfig { #[allow(clippy::too_many_arguments)] pub fn app_setup( pool: sqlx::Pool, + ro_pool: ReadOnlyPgPool, redis_pool: RedisPool, search_config: search::SearchConfig, clickhouse: &mut Client, @@ -265,6 +268,7 @@ pub fn app_setup( LabrinthConfig { pool, + ro_pool, redis_pool, clickhouse: clickhouse.clone(), file_host, @@ -300,6 +304,7 @@ pub fn app_config( })) .app_data(web::Data::new(labrinth_config.redis_pool.clone())) .app_data(web::Data::new(labrinth_config.pool.clone())) + .app_data(web::Data::new(labrinth_config.ro_pool.clone())) .app_data(web::Data::new(labrinth_config.file_host.clone())) .app_data(web::Data::new(labrinth_config.search_config.clone())) .app_data(labrinth_config.session_queue.clone()) diff --git a/apps/labrinth/src/main.rs b/apps/labrinth/src/main.rs index 54bae0fc1..c3942b19c 100644 --- a/apps/labrinth/src/main.rs +++ b/apps/labrinth/src/main.rs @@ -83,7 +83,7 @@ async fn main() -> std::io::Result<()> { } // Database Connector - let pool = database::connect() + let (pool, ro_pool) = database::connect_all() .await .expect("Database connection failed"); @@ -167,6 +167,7 @@ async fn main() -> std::io::Result<()> { let labrinth_config = labrinth::app_setup( pool.clone(), + ro_pool.clone(), redis_pool.clone(), search_config.clone(), &mut clickhouse, diff --git a/apps/labrinth/src/routes/v2/version_file.rs b/apps/labrinth/src/routes/v2/version_file.rs index 11d4a50f7..1d843b5b0 100644 --- a/apps/labrinth/src/routes/v2/version_file.rs +++ b/apps/labrinth/src/routes/v2/version_file.rs @@ -1,4 +1,5 @@ use super::ApiError; +use crate::database::ReadOnlyPgPool; use crate::database::redis::RedisPool; use crate::models::projects::{Project, Version, VersionType}; use crate::models::v2::projects::{LegacyProject, LegacyVersion}; @@ -116,7 +117,7 @@ pub struct UpdateData { pub async fn get_update_from_hash( req: HttpRequest, info: web::Path<(String,)>, - pool: web::Data, + pool: web::Data, redis: web::Data, hash_query: web::Query, update_data: web::Json, @@ -170,7 +171,7 @@ pub struct FileHashes { #[post("")] pub async fn get_versions_from_hashes( req: HttpRequest, - pool: web::Data, + pool: web::Data, redis: web::Data, file_data: web::Json, session_queue: web::Data, @@ -277,7 +278,7 @@ pub struct ManyUpdateData { #[post("update")] pub async fn update_files( - pool: web::Data, + pool: web::Data, redis: web::Data, update_data: web::Json, ) -> Result { diff --git a/apps/labrinth/src/routes/v3/version_file.rs b/apps/labrinth/src/routes/v3/version_file.rs index 49567dd15..29a7fedda 100644 --- a/apps/labrinth/src/routes/v3/version_file.rs +++ b/apps/labrinth/src/routes/v3/version_file.rs @@ -1,6 +1,7 @@ use super::ApiError; use crate::auth::checks::{filter_visible_versions, is_visible_version}; use crate::auth::{filter_visible_projects, get_user_from_headers}; +use crate::database::ReadOnlyPgPool; use crate::database::redis::RedisPool; use crate::models::ids::VersionId; use crate::models::pats::Scopes; @@ -121,7 +122,7 @@ pub struct UpdateData { pub async fn get_update_from_hash( req: HttpRequest, info: web::Path<(String,)>, - pool: web::Data, + pool: web::Data, redis: web::Data, hash_query: web::Query, update_data: web::Json, @@ -129,7 +130,7 @@ pub async fn get_update_from_hash( ) -> Result { let user_option = get_user_from_headers( &req, - &**pool, + &***pool, &redis, &session_queue, Scopes::VERSION_READ, @@ -144,20 +145,20 @@ pub async fn get_update_from_hash( }), hash, hash_query.version_id.map(|x| x.into()), - &**pool, + &***pool, &redis, ) .await? && let Some(project) = database::models::DBProject::get_id( file.project_id, - &**pool, + &***pool, &redis, ) .await? { let mut versions = database::models::DBVersion::get_many( &project.versions, - &**pool, + &***pool, &redis, ) .await? @@ -211,14 +212,14 @@ pub struct FileHashes { pub async fn get_versions_from_hashes( req: HttpRequest, - pool: web::Data, + pool: web::Data, redis: web::Data, file_data: web::Json, session_queue: web::Data, ) -> Result { let user_option = get_user_from_headers( &req, - &**pool, + &***pool, &redis, &session_queue, Scopes::VERSION_READ, @@ -235,14 +236,14 @@ pub async fn get_versions_from_hashes( let files = database::models::DBVersion::get_files_from_hash( algorithm.clone(), &file_data.hashes, - &**pool, + &***pool, &redis, ) .await?; let version_ids = files.iter().map(|x| x.version_id).collect::>(); let versions_data = filter_visible_versions( - database::models::DBVersion::get_many(&version_ids, &**pool, &redis) + database::models::DBVersion::get_many(&version_ids, &***pool, &redis) .await?, &user_option, &pool, @@ -329,8 +330,9 @@ pub struct ManyUpdateData { pub game_versions: Option>, pub version_types: Option>, } + pub async fn update_files( - pool: web::Data, + pool: web::Data, redis: web::Data, update_data: web::Json, ) -> Result { @@ -341,7 +343,7 @@ pub async fn update_files( let files = database::models::DBVersion::get_files_from_hash( algorithm.clone(), &update_data.hashes, - &**pool, + &***pool, &redis, ) .await?; @@ -364,7 +366,7 @@ pub async fn update_files( &update_data.loaders.clone().unwrap_or_default(), &update_data.version_types.clone().unwrap_or_default().iter().map(|x| x.to_string()).collect::>(), ) - .fetch(&**pool) + .fetch(&***pool) .try_fold(DashMap::new(), |acc : DashMap<_,Vec>, m| { acc.entry(database::models::DBProjectId(m.mod_id)) .or_default() @@ -378,7 +380,7 @@ pub async fn update_files( .into_iter() .filter_map(|x| x.1.last().copied()) .collect::>(), - &**pool, + &***pool, &redis, ) .await?; diff --git a/apps/labrinth/tests/common/database.rs b/apps/labrinth/tests/common/database.rs index 6297838f2..209c199a5 100644 --- a/apps/labrinth/tests/common/database.rs +++ b/apps/labrinth/tests/common/database.rs @@ -1,4 +1,6 @@ -use labrinth::{database::redis::RedisPool, search}; +use labrinth::database::ReadOnlyPgPool; +use labrinth::database::redis::RedisPool; +use labrinth::search; use sqlx::{PgPool, postgres::PgPoolOptions}; use std::time::Duration; use url::Url; @@ -36,6 +38,7 @@ const TEMPLATE_DATABASE_NAME: &str = "labrinth_tests_template"; #[derive(Clone)] pub struct TemporaryDatabase { pub pool: PgPool, + pub ro_pool: ReadOnlyPgPool, pub redis_pool: RedisPool, pub search_config: labrinth::search::SearchConfig, pub database_name: String, @@ -74,6 +77,8 @@ impl TemporaryDatabase { .await .expect("Connection to temporary database failed"); + let ro_pool = ReadOnlyPgPool::from(pool.clone()); + println!("Running migrations on temporary database"); // Performs migrations @@ -90,6 +95,7 @@ impl TemporaryDatabase { search::SearchConfig::new(Some(temp_database_name.clone())); Self { pool, + ro_pool, database_name: temp_database_name, redis_pool, search_config, @@ -184,6 +190,7 @@ impl TemporaryDatabase { let name = generate_random_name("test_template_"); let db = TemporaryDatabase { pool: pool.clone(), + ro_pool: ReadOnlyPgPool::from(pool.clone()), database_name: TEMPLATE_DATABASE_NAME.to_string(), redis_pool: RedisPool::new(Some(name.clone())), search_config: search::SearchConfig::new(Some(name)), diff --git a/apps/labrinth/tests/common/mod.rs b/apps/labrinth/tests/common/mod.rs index 625c8498c..3b1baa96a 100644 --- a/apps/labrinth/tests/common/mod.rs +++ b/apps/labrinth/tests/common/mod.rs @@ -26,6 +26,7 @@ pub async fn setup(db: &database::TemporaryDatabase) -> LabrinthConfig { } let pool = db.pool.clone(); + let ro_pool = db.ro_pool.clone(); let redis_pool = db.redis_pool.clone(); let search_config = db.search_config.clone(); let file_host: Arc = @@ -40,6 +41,7 @@ pub async fn setup(db: &database::TemporaryDatabase) -> LabrinthConfig { labrinth::app_setup( pool.clone(), + ro_pool.clone(), redis_pool.clone(), search_config, &mut clickhouse,