use std::sync::Arc; use std::time::Duration; use actix_web::web; use database::redis::RedisPool; use modrinth_maxmind::MaxMind; use queue::{ analytics::AnalyticsQueue, email::EmailQueue, payouts::PayoutsQueue, session::AuthQueue, socket::ActiveSockets, }; use sqlx::Postgres; use tracing::{info, warn}; extern crate clickhouse as clickhouse_crate; use clickhouse_crate::Client; use util::cors::default_cors; use util::gotenberg::GotenbergClient; use crate::background_task::update_versions; use crate::database::ReadOnlyPgPool; use crate::queue::billing::{index_billing, index_subscriptions}; use crate::queue::moderation::AutomatedModerationQueue; use crate::util::anrok; use crate::util::archon::ArchonClient; use crate::util::env::{parse_strings_from_var, parse_var}; use crate::util::ratelimit::{AsyncRateLimiter, GCRAParameters}; use sync::friends::handle_pubsub; pub mod auth; pub mod background_task; pub mod clickhouse; pub mod database; pub mod file_hosting; pub mod models; pub mod queue; pub mod routes; pub mod scheduler; pub mod search; pub mod sync; pub mod test; pub mod util; pub mod validate; #[derive(Clone)] pub struct Pepper { pub pepper: String, } #[derive(Clone)] pub struct LabrinthConfig { pub pool: sqlx::Pool, pub ro_pool: ReadOnlyPgPool, pub redis_pool: RedisPool, pub clickhouse: Client, pub file_host: Arc, pub maxmind: web::Data, pub scheduler: Arc, pub ip_salt: Pepper, pub search_config: search::SearchConfig, pub session_queue: web::Data, pub payouts_queue: web::Data, pub analytics_queue: Arc, pub active_sockets: web::Data, pub automated_moderation_queue: web::Data, pub rate_limiter: web::Data, pub stripe_client: stripe::Client, pub anrok_client: anrok::Client, pub email_queue: web::Data, pub archon_client: web::Data, pub gotenberg_client: GotenbergClient, } #[allow(clippy::too_many_arguments)] pub fn app_setup( pool: sqlx::Pool, ro_pool: ReadOnlyPgPool, redis_pool: RedisPool, search_config: search::SearchConfig, clickhouse: &mut Client, file_host: Arc, maxmind: MaxMind, stripe_client: stripe::Client, anrok_client: anrok::Client, email_queue: EmailQueue, gotenberg_client: GotenbergClient, enable_background_tasks: bool, ) -> LabrinthConfig { info!( "Starting labrinth on {}", dotenvy::var("BIND_ADDR").unwrap() ); let automated_moderation_queue = web::Data::new(AutomatedModerationQueue::default()); { let automated_moderation_queue_ref = automated_moderation_queue.clone(); let pool_ref = pool.clone(); let redis_pool_ref = redis_pool.clone(); actix_rt::spawn(async move { automated_moderation_queue_ref .task(pool_ref, redis_pool_ref) .await; }); } let scheduler = scheduler::Scheduler::new(); let limiter = web::Data::new(AsyncRateLimiter::new( redis_pool.clone(), GCRAParameters::new(300, 300), )); if enable_background_tasks { // The interval in seconds at which the local database is indexed // for searching. Defaults to 1 hour if unset. let local_index_interval = Duration::from_secs( parse_var("LOCAL_INDEX_INTERVAL").unwrap_or(3600), ); let pool_ref = pool.clone(); let search_config_ref = search_config.clone(); let redis_pool_ref = redis_pool.clone(); scheduler.run(local_index_interval, move || { let pool_ref = pool_ref.clone(); let redis_pool_ref = redis_pool_ref.clone(); let search_config_ref = search_config_ref.clone(); async move { background_task::index_search( pool_ref, redis_pool_ref, search_config_ref, ) .await; } }); // Changes statuses of scheduled projects/versions let pool_ref = pool.clone(); // TODO: Clear cache when these are run scheduler.run(Duration::from_secs(60 * 5), move || { let pool_ref = pool_ref.clone(); async move { background_task::release_scheduled(pool_ref).await; } }); let version_index_interval = Duration::from_secs( parse_var("VERSION_INDEX_INTERVAL").unwrap_or(1800), ); let pool_ref = pool.clone(); let redis_pool_ref = redis_pool.clone(); scheduler.run(version_index_interval, move || { let pool_ref = pool_ref.clone(); let redis = redis_pool_ref.clone(); async move { update_versions(pool_ref, redis).await; } }); let pool_ref = pool.clone(); let client_ref = clickhouse.clone(); let redis_pool_ref = redis_pool.clone(); scheduler.run(Duration::from_secs(60 * 60 * 6), move || { let pool_ref = pool_ref.clone(); let client_ref = client_ref.clone(); let redis_ref = redis_pool_ref.clone(); async move { background_task::payouts(pool_ref, client_ref, redis_ref).await; } }); let pool_ref = pool.clone(); let redis_ref = redis_pool.clone(); let stripe_client_ref = stripe_client.clone(); let anrok_client_ref = anrok_client.clone(); actix_rt::spawn(async move { loop { index_billing( stripe_client_ref.clone(), anrok_client_ref.clone(), pool_ref.clone(), redis_ref.clone(), ) .await; tokio::time::sleep(Duration::from_secs(60 * 5)).await; } }); let pool_ref = pool.clone(); let redis_ref = redis_pool.clone(); let stripe_client_ref = stripe_client.clone(); let anrok_client_ref = anrok_client.clone(); actix_rt::spawn(async move { loop { index_subscriptions( pool_ref.clone(), redis_ref.clone(), stripe_client_ref.clone(), anrok_client_ref.clone(), ) .await; tokio::time::sleep(Duration::from_secs(60 * 5)).await; } }); } let session_queue = web::Data::new(AuthQueue::new()); let pool_ref = pool.clone(); let redis_ref = redis_pool.clone(); let session_queue_ref = session_queue.clone(); scheduler.run(Duration::from_secs(60 * 30), move || { let pool_ref = pool_ref.clone(); let redis_ref = redis_ref.clone(); let session_queue_ref = session_queue_ref.clone(); async move { info!("Indexing sessions queue"); let result = session_queue_ref.index(&pool_ref, &redis_ref).await; if let Err(e) = result { warn!("Indexing sessions queue failed: {:?}", e); } info!("Done indexing sessions queue"); } }); let analytics_queue = Arc::new(AnalyticsQueue::new()); { let client_ref = clickhouse.clone(); let analytics_queue_ref = analytics_queue.clone(); let pool_ref = pool.clone(); let redis_ref = redis_pool.clone(); scheduler.run(Duration::from_secs(15), move || { let client_ref = client_ref.clone(); let analytics_queue_ref = analytics_queue_ref.clone(); let pool_ref = pool_ref.clone(); let redis_ref = redis_ref.clone(); async move { info!("Indexing analytics queue"); let result = analytics_queue_ref .index(client_ref, &redis_ref, &pool_ref) .await; if let Err(e) = result { warn!("Indexing analytics queue failed: {:?}", e); } info!("Done indexing analytics queue"); } }); } let ip_salt = Pepper { pepper: ariadne::ids::Base62Id(ariadne::ids::random_base62(11)) .to_string(), }; let active_sockets = web::Data::new(ActiveSockets::default()); { let pool = pool.clone(); let redis_client = redis::Client::open(redis_pool.url.clone()).unwrap(); let sockets = active_sockets.clone(); actix_rt::spawn(async move { let pubsub = redis_client.get_async_pubsub().await.unwrap(); handle_pubsub(pubsub, pool, sockets).await; }); } LabrinthConfig { pool, ro_pool, redis_pool, clickhouse: clickhouse.clone(), file_host, maxmind: web::Data::new(maxmind), scheduler: Arc::new(scheduler), ip_salt, search_config, session_queue, payouts_queue: web::Data::new(PayoutsQueue::new()), analytics_queue, active_sockets, automated_moderation_queue, rate_limiter: limiter, stripe_client, anrok_client, gotenberg_client, archon_client: web::Data::new( ArchonClient::from_env() .expect("ARCHON_URL and PYRO_API_KEY must be set"), ), email_queue: web::Data::new(email_queue), } } pub fn app_config( cfg: &mut web::ServiceConfig, labrinth_config: LabrinthConfig, ) { cfg.app_data(web::FormConfig::default().error_handler(|err, _req| { routes::ApiError::Validation(err.to_string()).into() })) .app_data(web::PathConfig::default().error_handler(|err, _req| { routes::ApiError::Validation(err.to_string()).into() })) .app_data(web::QueryConfig::default().error_handler(|err, _req| { routes::ApiError::Validation(err.to_string()).into() })) .app_data(web::JsonConfig::default().error_handler(|err, _req| { routes::ApiError::Validation(err.to_string()).into() })) .app_data(web::Data::new(labrinth_config.redis_pool.clone())) .app_data(web::Data::new(labrinth_config.pool.clone())) .app_data(web::Data::new(labrinth_config.ro_pool.clone())) .app_data(web::Data::new(labrinth_config.file_host.clone())) .app_data(web::Data::new(labrinth_config.search_config.clone())) .app_data(web::Data::new(labrinth_config.gotenberg_client.clone())) .app_data(labrinth_config.session_queue.clone()) .app_data(labrinth_config.payouts_queue.clone()) .app_data(labrinth_config.email_queue.clone()) .app_data(labrinth_config.maxmind.clone()) .app_data(web::Data::new(labrinth_config.ip_salt.clone())) .app_data(web::Data::new(labrinth_config.analytics_queue.clone())) .app_data(web::Data::new(labrinth_config.clickhouse.clone())) .app_data(labrinth_config.active_sockets.clone()) .app_data(labrinth_config.automated_moderation_queue.clone()) .app_data(labrinth_config.archon_client.clone()) .app_data(web::Data::new(labrinth_config.stripe_client.clone())) .app_data(web::Data::new(labrinth_config.anrok_client.clone())) .app_data(labrinth_config.rate_limiter.clone()) .configure({ #[cfg(target_os = "linux")] { |cfg| routes::debug::config(cfg) } #[cfg(not(target_os = "linux"))] { |_cfg| () } }) .configure(routes::v2::config) .configure(routes::v3::config) .configure(routes::internal::config) .configure(routes::root_config) .default_service(web::get().wrap(default_cors()).to(routes::not_found)); } pub fn utoipa_app_config( cfg: &mut utoipa_actix_web::service_config::ServiceConfig, _labrinth_config: LabrinthConfig, ) { cfg.configure(routes::v3::utoipa_config); } // This is so that env vars not used immediately don't panic at runtime pub fn check_env_vars() -> bool { let mut failed = false; fn check_var(var: &str) -> bool { let check = parse_var::(var).is_none(); if check { warn!( "Variable `{}` missing in dotenv or not of type `{}`", var, std::any::type_name::() ); } check } failed |= check_var::("SITE_URL"); failed |= check_var::("CDN_URL"); failed |= check_var::("LABRINTH_ADMIN_KEY"); failed |= check_var::("LABRINTH_EXTERNAL_NOTIFICATION_KEY"); failed |= check_var::("RATE_LIMIT_IGNORE_KEY"); failed |= check_var::("DATABASE_URL"); failed |= check_var::("MEILISEARCH_ADDR"); failed |= check_var::("MEILISEARCH_KEY"); failed |= check_var::("REDIS_URL"); failed |= check_var::("BIND_ADDR"); failed |= check_var::("SELF_ADDR"); failed |= check_var::("STORAGE_BACKEND"); let storage_backend = dotenvy::var("STORAGE_BACKEND").ok(); match storage_backend.as_deref() { Some("s3") => { let mut check_var_set = |var_prefix| { failed |= check_var::(&format!( "S3_{var_prefix}_BUCKET_NAME" )); failed |= check_var::(&format!( "S3_{var_prefix}_USES_PATH_STYLE_BUCKET" )); failed |= check_var::(&format!("S3_{var_prefix}_REGION")); failed |= check_var::(&format!("S3_{var_prefix}_URL")); failed |= check_var::(&format!( "S3_{var_prefix}_ACCESS_TOKEN" )); failed |= check_var::(&format!("S3_{var_prefix}_SECRET")); }; check_var_set("PUBLIC"); check_var_set("PRIVATE"); } Some("local") => { failed |= check_var::("MOCK_FILE_PATH"); } Some(backend) => { warn!( "Variable `STORAGE_BACKEND` contains an invalid value: {backend}. Expected \"s3\" or \"local\"." ); failed |= true; } _ => { warn!("Variable `STORAGE_BACKEND` is not set!"); failed |= true; } } failed |= check_var::("LOCAL_INDEX_INTERVAL"); failed |= check_var::("VERSION_INDEX_INTERVAL"); if parse_strings_from_var("WHITELISTED_MODPACK_DOMAINS").is_none() { warn!( "Variable `WHITELISTED_MODPACK_DOMAINS` missing in dotenv or not a json array of strings" ); failed |= true; } if parse_strings_from_var("ALLOWED_CALLBACK_URLS").is_none() { warn!( "Variable `ALLOWED_CALLBACK_URLS` missing in dotenv or not a json array of strings" ); failed |= true; } failed |= check_var::("GITHUB_CLIENT_ID"); failed |= check_var::("GITHUB_CLIENT_SECRET"); failed |= check_var::("GITLAB_CLIENT_ID"); failed |= check_var::("GITLAB_CLIENT_SECRET"); failed |= check_var::("DISCORD_CLIENT_ID"); failed |= check_var::("DISCORD_CLIENT_SECRET"); failed |= check_var::("MICROSOFT_CLIENT_ID"); failed |= check_var::("MICROSOFT_CLIENT_SECRET"); failed |= check_var::("GOOGLE_CLIENT_ID"); failed |= check_var::("GOOGLE_CLIENT_SECRET"); failed |= check_var::("STEAM_API_KEY"); failed |= check_var::("TREMENDOUS_API_URL"); failed |= check_var::("TREMENDOUS_API_KEY"); failed |= check_var::("TREMENDOUS_PRIVATE_KEY"); failed |= check_var::("PAYPAL_API_URL"); failed |= check_var::("PAYPAL_WEBHOOK_ID"); failed |= check_var::("PAYPAL_CLIENT_ID"); failed |= check_var::("PAYPAL_CLIENT_SECRET"); failed |= check_var::("PAYPAL_NVP_USERNAME"); failed |= check_var::("PAYPAL_NVP_PASSWORD"); failed |= check_var::("PAYPAL_NVP_SIGNATURE"); failed |= check_var::("HCAPTCHA_SECRET"); failed |= check_var::("SMTP_USERNAME"); failed |= check_var::("SMTP_PASSWORD"); failed |= check_var::("SMTP_HOST"); failed |= check_var::("SMTP_PORT"); failed |= check_var::("SMTP_TLS"); failed |= check_var::("SMTP_FROM_NAME"); failed |= check_var::("SMTP_FROM_ADDRESS"); failed |= check_var::("SITE_VERIFY_EMAIL_PATH"); failed |= check_var::("SITE_RESET_PASSWORD_PATH"); failed |= check_var::("SITE_BILLING_PATH"); failed |= check_var::("SENDY_URL"); failed |= check_var::("SENDY_LIST_ID"); failed |= check_var::("SENDY_API_KEY"); if parse_strings_from_var("ANALYTICS_ALLOWED_ORIGINS").is_none() { warn!( "Variable `ANALYTICS_ALLOWED_ORIGINS` missing in dotenv or not a json array of strings" ); failed |= true; } failed |= check_var::("CLICKHOUSE_REPLICATED"); failed |= check_var::("CLICKHOUSE_URL"); failed |= check_var::("CLICKHOUSE_USER"); failed |= check_var::("CLICKHOUSE_PASSWORD"); failed |= check_var::("CLICKHOUSE_DATABASE"); failed |= check_var::("MAXMIND_ACCOUNT_ID"); failed |= check_var::("MAXMIND_LICENSE_KEY"); failed |= check_var::("FLAME_ANVIL_URL"); failed |= check_var::("GOTENBERG_URL"); failed |= check_var::("GOTENBERG_CALLBACK_BASE"); failed |= check_var::("STRIPE_API_KEY"); failed |= check_var::("STRIPE_WEBHOOK_SECRET"); failed |= check_var::("ADITUDE_API_KEY"); failed |= check_var::("PYRO_API_KEY"); failed |= check_var::("BREX_API_URL"); failed |= check_var::("BREX_API_KEY"); failed |= check_var::("DELPHI_URL"); failed |= check_var::("AVALARA_1099_API_URL"); failed |= check_var::("AVALARA_1099_API_KEY"); failed |= check_var::("AVALARA_1099_API_TEAM_ID"); failed |= check_var::("AVALARA_1099_COMPANY_ID"); failed |= check_var::("ANROK_API_URL"); failed |= check_var::("ANROK_API_KEY"); failed |= check_var::("COMPLIANCE_PAYOUT_THRESHOLD"); failed |= check_var::("PAYOUT_ALERT_SLACK_WEBHOOK"); failed |= check_var::("ARCHON_URL"); failed |= check_var::("MURALPAY_API_URL"); failed |= check_var::("MURALPAY_API_KEY"); failed |= check_var::("MURALPAY_TRANSFER_API_KEY"); failed |= check_var::("MURALPAY_SOURCE_ACCOUNT_ID"); failed }