Support alternative read-replica PgPool (#4374)

* Add ReadOnlyPgPool

* Clippy, fmt
This commit is contained in:
François-Xavier Talbot
2025-09-14 11:44:52 -04:00
committed by GitHub
parent 67e090565e
commit 3fc55184a7
8 changed files with 88 additions and 23 deletions

View File

@@ -4,6 +4,6 @@ pub mod redis;
pub use models::DBImage; pub use models::DBImage;
pub use models::DBProject; pub use models::DBProject;
pub use models::DBVersion; pub use models::DBVersion;
pub use postgres_database::check_for_migrations; pub use postgres_database::{
pub use postgres_database::connect; ReadOnlyPgPool, check_for_migrations, connect_all, register_and_set_metrics,
pub use postgres_database::register_and_set_metrics; };

View File

@@ -2,10 +2,35 @@ use prometheus::{IntGauge, Registry};
use sqlx::migrate::MigrateDatabase; use sqlx::migrate::MigrateDatabase;
use sqlx::postgres::{PgPool, PgPoolOptions}; use sqlx::postgres::{PgPool, PgPoolOptions};
use sqlx::{Connection, PgConnection, Postgres}; use sqlx::{Connection, PgConnection, Postgres};
use std::ops::{Deref, DerefMut};
use std::time::Duration; use std::time::Duration;
use tracing::info; use tracing::info;
pub async fn connect() -> Result<PgPool, sqlx::Error> { #[derive(Clone)]
#[repr(transparent)]
pub struct ReadOnlyPgPool(PgPool);
impl From<PgPool> for ReadOnlyPgPool {
fn from(pool: PgPool) -> Self {
ReadOnlyPgPool(pool)
}
}
impl Deref for ReadOnlyPgPool {
type Target = PgPool;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for ReadOnlyPgPool {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
pub async fn connect_all() -> Result<(PgPool, ReadOnlyPgPool), sqlx::Error> {
info!("Initializing database connection"); info!("Initializing database connection");
let database_url = let database_url =
dotenvy::var("DATABASE_URL").expect("`DATABASE_URL` not in .env"); dotenvy::var("DATABASE_URL").expect("`DATABASE_URL` not in .env");
@@ -26,7 +51,29 @@ pub async fn connect() -> Result<PgPool, sqlx::Error> {
.connect(&database_url) .connect(&database_url)
.await?; .await?;
Ok(pool) if let Ok(url) = dotenvy::var("READONLY_DATABASE_URL") {
let ro_pool = PgPoolOptions::new()
.min_connections(
dotenvy::var("READONLY_DATABASE_MIN_CONNECTIONS")
.ok()
.and_then(|x| x.parse().ok())
.unwrap_or(0),
)
.max_connections(
dotenvy::var("READONLY_DATABASE_MAX_CONNECTIONS")
.ok()
.and_then(|x| x.parse().ok())
.unwrap_or(1),
)
.max_lifetime(Some(Duration::from_secs(60 * 60)))
.connect(&url)
.await?;
Ok((pool, ReadOnlyPgPool(ro_pool)))
} else {
let ro = ReadOnlyPgPool(pool.clone());
Ok((pool, ro))
}
} }
pub async fn check_for_migrations() -> Result<(), sqlx::Error> { pub async fn check_for_migrations() -> Result<(), sqlx::Error> {
let uri = dotenvy::var("DATABASE_URL").expect("`DATABASE_URL` not in .env"); let uri = dotenvy::var("DATABASE_URL").expect("`DATABASE_URL` not in .env");

View File

@@ -15,6 +15,7 @@ use clickhouse_crate::Client;
use util::cors::default_cors; use util::cors::default_cors;
use crate::background_task::update_versions; use crate::background_task::update_versions;
use crate::database::ReadOnlyPgPool;
use crate::queue::moderation::AutomatedModerationQueue; use crate::queue::moderation::AutomatedModerationQueue;
use crate::util::env::{parse_strings_from_var, parse_var}; use crate::util::env::{parse_strings_from_var, parse_var};
use crate::util::ratelimit::{AsyncRateLimiter, GCRAParameters}; use crate::util::ratelimit::{AsyncRateLimiter, GCRAParameters};
@@ -42,6 +43,7 @@ pub struct Pepper {
#[derive(Clone)] #[derive(Clone)]
pub struct LabrinthConfig { pub struct LabrinthConfig {
pub pool: sqlx::Pool<Postgres>, pub pool: sqlx::Pool<Postgres>,
pub ro_pool: ReadOnlyPgPool,
pub redis_pool: RedisPool, pub redis_pool: RedisPool,
pub clickhouse: Client, pub clickhouse: Client,
pub file_host: Arc<dyn file_hosting::FileHost + Send + Sync>, pub file_host: Arc<dyn file_hosting::FileHost + Send + Sync>,
@@ -61,6 +63,7 @@ pub struct LabrinthConfig {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn app_setup( pub fn app_setup(
pool: sqlx::Pool<Postgres>, pool: sqlx::Pool<Postgres>,
ro_pool: ReadOnlyPgPool,
redis_pool: RedisPool, redis_pool: RedisPool,
search_config: search::SearchConfig, search_config: search::SearchConfig,
clickhouse: &mut Client, clickhouse: &mut Client,
@@ -265,6 +268,7 @@ pub fn app_setup(
LabrinthConfig { LabrinthConfig {
pool, pool,
ro_pool,
redis_pool, redis_pool,
clickhouse: clickhouse.clone(), clickhouse: clickhouse.clone(),
file_host, file_host,
@@ -300,6 +304,7 @@ pub fn app_config(
})) }))
.app_data(web::Data::new(labrinth_config.redis_pool.clone())) .app_data(web::Data::new(labrinth_config.redis_pool.clone()))
.app_data(web::Data::new(labrinth_config.pool.clone())) .app_data(web::Data::new(labrinth_config.pool.clone()))
.app_data(web::Data::new(labrinth_config.ro_pool.clone()))
.app_data(web::Data::new(labrinth_config.file_host.clone())) .app_data(web::Data::new(labrinth_config.file_host.clone()))
.app_data(web::Data::new(labrinth_config.search_config.clone())) .app_data(web::Data::new(labrinth_config.search_config.clone()))
.app_data(labrinth_config.session_queue.clone()) .app_data(labrinth_config.session_queue.clone())

View File

@@ -83,7 +83,7 @@ async fn main() -> std::io::Result<()> {
} }
// Database Connector // Database Connector
let pool = database::connect() let (pool, ro_pool) = database::connect_all()
.await .await
.expect("Database connection failed"); .expect("Database connection failed");
@@ -167,6 +167,7 @@ async fn main() -> std::io::Result<()> {
let labrinth_config = labrinth::app_setup( let labrinth_config = labrinth::app_setup(
pool.clone(), pool.clone(),
ro_pool.clone(),
redis_pool.clone(), redis_pool.clone(),
search_config.clone(), search_config.clone(),
&mut clickhouse, &mut clickhouse,

View File

@@ -1,4 +1,5 @@
use super::ApiError; use super::ApiError;
use crate::database::ReadOnlyPgPool;
use crate::database::redis::RedisPool; use crate::database::redis::RedisPool;
use crate::models::projects::{Project, Version, VersionType}; use crate::models::projects::{Project, Version, VersionType};
use crate::models::v2::projects::{LegacyProject, LegacyVersion}; use crate::models::v2::projects::{LegacyProject, LegacyVersion};
@@ -116,7 +117,7 @@ pub struct UpdateData {
pub async fn get_update_from_hash( pub async fn get_update_from_hash(
req: HttpRequest, req: HttpRequest,
info: web::Path<(String,)>, info: web::Path<(String,)>,
pool: web::Data<PgPool>, pool: web::Data<ReadOnlyPgPool>,
redis: web::Data<RedisPool>, redis: web::Data<RedisPool>,
hash_query: web::Query<HashQuery>, hash_query: web::Query<HashQuery>,
update_data: web::Json<UpdateData>, update_data: web::Json<UpdateData>,
@@ -170,7 +171,7 @@ pub struct FileHashes {
#[post("")] #[post("")]
pub async fn get_versions_from_hashes( pub async fn get_versions_from_hashes(
req: HttpRequest, req: HttpRequest,
pool: web::Data<PgPool>, pool: web::Data<ReadOnlyPgPool>,
redis: web::Data<RedisPool>, redis: web::Data<RedisPool>,
file_data: web::Json<FileHashes>, file_data: web::Json<FileHashes>,
session_queue: web::Data<AuthQueue>, session_queue: web::Data<AuthQueue>,
@@ -277,7 +278,7 @@ pub struct ManyUpdateData {
#[post("update")] #[post("update")]
pub async fn update_files( pub async fn update_files(
pool: web::Data<PgPool>, pool: web::Data<ReadOnlyPgPool>,
redis: web::Data<RedisPool>, redis: web::Data<RedisPool>,
update_data: web::Json<ManyUpdateData>, update_data: web::Json<ManyUpdateData>,
) -> Result<HttpResponse, ApiError> { ) -> Result<HttpResponse, ApiError> {

View File

@@ -1,6 +1,7 @@
use super::ApiError; use super::ApiError;
use crate::auth::checks::{filter_visible_versions, is_visible_version}; use crate::auth::checks::{filter_visible_versions, is_visible_version};
use crate::auth::{filter_visible_projects, get_user_from_headers}; use crate::auth::{filter_visible_projects, get_user_from_headers};
use crate::database::ReadOnlyPgPool;
use crate::database::redis::RedisPool; use crate::database::redis::RedisPool;
use crate::models::ids::VersionId; use crate::models::ids::VersionId;
use crate::models::pats::Scopes; use crate::models::pats::Scopes;
@@ -121,7 +122,7 @@ pub struct UpdateData {
pub async fn get_update_from_hash( pub async fn get_update_from_hash(
req: HttpRequest, req: HttpRequest,
info: web::Path<(String,)>, info: web::Path<(String,)>,
pool: web::Data<PgPool>, pool: web::Data<ReadOnlyPgPool>,
redis: web::Data<RedisPool>, redis: web::Data<RedisPool>,
hash_query: web::Query<HashQuery>, hash_query: web::Query<HashQuery>,
update_data: web::Json<UpdateData>, update_data: web::Json<UpdateData>,
@@ -129,7 +130,7 @@ pub async fn get_update_from_hash(
) -> Result<HttpResponse, ApiError> { ) -> Result<HttpResponse, ApiError> {
let user_option = get_user_from_headers( let user_option = get_user_from_headers(
&req, &req,
&**pool, &***pool,
&redis, &redis,
&session_queue, &session_queue,
Scopes::VERSION_READ, Scopes::VERSION_READ,
@@ -144,20 +145,20 @@ pub async fn get_update_from_hash(
}), }),
hash, hash,
hash_query.version_id.map(|x| x.into()), hash_query.version_id.map(|x| x.into()),
&**pool, &***pool,
&redis, &redis,
) )
.await? .await?
&& let Some(project) = database::models::DBProject::get_id( && let Some(project) = database::models::DBProject::get_id(
file.project_id, file.project_id,
&**pool, &***pool,
&redis, &redis,
) )
.await? .await?
{ {
let mut versions = database::models::DBVersion::get_many( let mut versions = database::models::DBVersion::get_many(
&project.versions, &project.versions,
&**pool, &***pool,
&redis, &redis,
) )
.await? .await?
@@ -211,14 +212,14 @@ pub struct FileHashes {
pub async fn get_versions_from_hashes( pub async fn get_versions_from_hashes(
req: HttpRequest, req: HttpRequest,
pool: web::Data<PgPool>, pool: web::Data<ReadOnlyPgPool>,
redis: web::Data<RedisPool>, redis: web::Data<RedisPool>,
file_data: web::Json<FileHashes>, file_data: web::Json<FileHashes>,
session_queue: web::Data<AuthQueue>, session_queue: web::Data<AuthQueue>,
) -> Result<HttpResponse, ApiError> { ) -> Result<HttpResponse, ApiError> {
let user_option = get_user_from_headers( let user_option = get_user_from_headers(
&req, &req,
&**pool, &***pool,
&redis, &redis,
&session_queue, &session_queue,
Scopes::VERSION_READ, Scopes::VERSION_READ,
@@ -235,14 +236,14 @@ pub async fn get_versions_from_hashes(
let files = database::models::DBVersion::get_files_from_hash( let files = database::models::DBVersion::get_files_from_hash(
algorithm.clone(), algorithm.clone(),
&file_data.hashes, &file_data.hashes,
&**pool, &***pool,
&redis, &redis,
) )
.await?; .await?;
let version_ids = files.iter().map(|x| x.version_id).collect::<Vec<_>>(); let version_ids = files.iter().map(|x| x.version_id).collect::<Vec<_>>();
let versions_data = filter_visible_versions( let versions_data = filter_visible_versions(
database::models::DBVersion::get_many(&version_ids, &**pool, &redis) database::models::DBVersion::get_many(&version_ids, &***pool, &redis)
.await?, .await?,
&user_option, &user_option,
&pool, &pool,
@@ -329,8 +330,9 @@ pub struct ManyUpdateData {
pub game_versions: Option<Vec<String>>, pub game_versions: Option<Vec<String>>,
pub version_types: Option<Vec<VersionType>>, pub version_types: Option<Vec<VersionType>>,
} }
pub async fn update_files( pub async fn update_files(
pool: web::Data<PgPool>, pool: web::Data<ReadOnlyPgPool>,
redis: web::Data<RedisPool>, redis: web::Data<RedisPool>,
update_data: web::Json<ManyUpdateData>, update_data: web::Json<ManyUpdateData>,
) -> Result<HttpResponse, ApiError> { ) -> Result<HttpResponse, ApiError> {
@@ -341,7 +343,7 @@ pub async fn update_files(
let files = database::models::DBVersion::get_files_from_hash( let files = database::models::DBVersion::get_files_from_hash(
algorithm.clone(), algorithm.clone(),
&update_data.hashes, &update_data.hashes,
&**pool, &***pool,
&redis, &redis,
) )
.await?; .await?;
@@ -364,7 +366,7 @@ pub async fn update_files(
&update_data.loaders.clone().unwrap_or_default(), &update_data.loaders.clone().unwrap_or_default(),
&update_data.version_types.clone().unwrap_or_default().iter().map(|x| x.to_string()).collect::<Vec<_>>(), &update_data.version_types.clone().unwrap_or_default().iter().map(|x| x.to_string()).collect::<Vec<_>>(),
) )
.fetch(&**pool) .fetch(&***pool)
.try_fold(DashMap::new(), |acc : DashMap<_,Vec<database::models::ids::DBVersionId>>, m| { .try_fold(DashMap::new(), |acc : DashMap<_,Vec<database::models::ids::DBVersionId>>, m| {
acc.entry(database::models::DBProjectId(m.mod_id)) acc.entry(database::models::DBProjectId(m.mod_id))
.or_default() .or_default()
@@ -378,7 +380,7 @@ pub async fn update_files(
.into_iter() .into_iter()
.filter_map(|x| x.1.last().copied()) .filter_map(|x| x.1.last().copied())
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
&**pool, &***pool,
&redis, &redis,
) )
.await?; .await?;

View File

@@ -1,4 +1,6 @@
use labrinth::{database::redis::RedisPool, search}; use labrinth::database::ReadOnlyPgPool;
use labrinth::database::redis::RedisPool;
use labrinth::search;
use sqlx::{PgPool, postgres::PgPoolOptions}; use sqlx::{PgPool, postgres::PgPoolOptions};
use std::time::Duration; use std::time::Duration;
use url::Url; use url::Url;
@@ -36,6 +38,7 @@ const TEMPLATE_DATABASE_NAME: &str = "labrinth_tests_template";
#[derive(Clone)] #[derive(Clone)]
pub struct TemporaryDatabase { pub struct TemporaryDatabase {
pub pool: PgPool, pub pool: PgPool,
pub ro_pool: ReadOnlyPgPool,
pub redis_pool: RedisPool, pub redis_pool: RedisPool,
pub search_config: labrinth::search::SearchConfig, pub search_config: labrinth::search::SearchConfig,
pub database_name: String, pub database_name: String,
@@ -74,6 +77,8 @@ impl TemporaryDatabase {
.await .await
.expect("Connection to temporary database failed"); .expect("Connection to temporary database failed");
let ro_pool = ReadOnlyPgPool::from(pool.clone());
println!("Running migrations on temporary database"); println!("Running migrations on temporary database");
// Performs migrations // Performs migrations
@@ -90,6 +95,7 @@ impl TemporaryDatabase {
search::SearchConfig::new(Some(temp_database_name.clone())); search::SearchConfig::new(Some(temp_database_name.clone()));
Self { Self {
pool, pool,
ro_pool,
database_name: temp_database_name, database_name: temp_database_name,
redis_pool, redis_pool,
search_config, search_config,
@@ -184,6 +190,7 @@ impl TemporaryDatabase {
let name = generate_random_name("test_template_"); let name = generate_random_name("test_template_");
let db = TemporaryDatabase { let db = TemporaryDatabase {
pool: pool.clone(), pool: pool.clone(),
ro_pool: ReadOnlyPgPool::from(pool.clone()),
database_name: TEMPLATE_DATABASE_NAME.to_string(), database_name: TEMPLATE_DATABASE_NAME.to_string(),
redis_pool: RedisPool::new(Some(name.clone())), redis_pool: RedisPool::new(Some(name.clone())),
search_config: search::SearchConfig::new(Some(name)), search_config: search::SearchConfig::new(Some(name)),

View File

@@ -26,6 +26,7 @@ pub async fn setup(db: &database::TemporaryDatabase) -> LabrinthConfig {
} }
let pool = db.pool.clone(); let pool = db.pool.clone();
let ro_pool = db.ro_pool.clone();
let redis_pool = db.redis_pool.clone(); let redis_pool = db.redis_pool.clone();
let search_config = db.search_config.clone(); let search_config = db.search_config.clone();
let file_host: Arc<dyn file_hosting::FileHost + Send + Sync> = let file_host: Arc<dyn file_hosting::FileHost + Send + Sync> =
@@ -40,6 +41,7 @@ pub async fn setup(db: &database::TemporaryDatabase) -> LabrinthConfig {
labrinth::app_setup( labrinth::app_setup(
pool.clone(), pool.clone(),
ro_pool.clone(),
redis_pool.clone(), redis_pool.clone(),
search_config, search_config,
&mut clickhouse, &mut clickhouse,