use eyre::Context; use prometheus::{IntGauge, Registry}; use sqlx::migrate::{MigrateDatabase, Migrator}; use sqlx::postgres::PgPoolOptions; use sqlx::{Connection, Postgres}; use std::ops::{Deref, DerefMut}; use std::time::Duration; use tracing::info; // TODO tracing spans pub type PgPool = sqlx_tracing::Pool; pub type PgTransaction<'c> = sqlx_tracing::Transaction<'c, Postgres>; pub use sqlx_tracing::Acquire; pub use sqlx_tracing::Executor; use crate::env::ENV; // pub type PgPool = sqlx::PgPool; // pub type PgTransaction<'c> = sqlx::Transaction<'c, Postgres>; // pub use sqlx::Acquire; // pub use sqlx::Executor; #[derive(Clone)] #[repr(transparent)] pub struct ReadOnlyPgPool(PgPool); impl From for ReadOnlyPgPool { fn from(pool: PgPool) -> Self { ReadOnlyPgPool(pool) } } impl ReadOnlyPgPool { pub fn into_inner(self) -> PgPool { self.0 } } impl Deref for ReadOnlyPgPool { type Target = PgPool; fn deref(&self) -> &Self::Target { &self.0 } } impl DerefMut for ReadOnlyPgPool { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } pub async fn connect_all() -> Result<(PgPool, ReadOnlyPgPool), sqlx::Error> { info!("Initializing database connection"); let database_url = &ENV.DATABASE_URL; let acquire_timeout = Duration::from_millis(ENV.DATABASE_ACQUIRE_TIMEOUT_MS); let pool = PgPoolOptions::new() .acquire_timeout(acquire_timeout) .min_connections(ENV.DATABASE_MIN_CONNECTIONS) .max_connections(ENV.DATABASE_MAX_CONNECTIONS) .max_lifetime(Some(Duration::from_secs(60 * 60))) .connect(database_url) .await?; let pool = PgPool::from(pool); if !ENV.READONLY_DATABASE_URL.is_empty() { let ro_pool = PgPoolOptions::new() .acquire_timeout(acquire_timeout) .min_connections(ENV.READONLY_DATABASE_MIN_CONNECTIONS) .max_connections(ENV.READONLY_DATABASE_MAX_CONNECTIONS) .max_lifetime(Some(Duration::from_secs(60 * 60))) .connect(&ENV.READONLY_DATABASE_URL) .await?; let ro_pool = PgPool::from(ro_pool); Ok((pool, ReadOnlyPgPool(ro_pool))) } else { let ro = ReadOnlyPgPool(pool.clone()); Ok((pool, ro)) } } pub async fn check_for_migrations() -> eyre::Result<()> { let uri = &ENV.DATABASE_URL; let uri = uri.as_str(); if !Postgres::database_exists(uri) .await .wrap_err("failed to check if database exists")? { info!("Creating database..."); Postgres::create_database(uri) .await .wrap_err("failed to create database")?; } info!("Applying migrations..."); let mut conn: sqlx::PgConnection = sqlx::PgConnection::connect(uri) .await .wrap_err("failed to connect to database")?; MIGRATOR .run(&mut conn) .await .wrap_err("failed to run database migrations")?; Ok(()) } pub static MIGRATOR: Migrator = sqlx::migrate!(); pub async fn register_and_set_metrics( pool: &PgPool, registry: &Registry, ) -> Result<(), prometheus::Error> { let pg_pool_size = IntGauge::new("labrinth_pg_pool_size", "Size of Postgres pool")?; let pg_pool_idle = IntGauge::new( "labrinth_pg_pool_idle", "Number of idle Postgres connections", )?; registry.register(Box::new(pg_pool_size.clone()))?; registry.register(Box::new(pg_pool_idle.clone()))?; let pool_ref = pool.clone(); tokio::spawn(async move { loop { pg_pool_size.set(pool_ref.size() as i64); pg_pool_idle.set(pool_ref.num_idle() as i64); tokio::time::sleep(Duration::from_secs(5)).await; } }); Ok(()) }