Files
AstralRinth/apps/labrinth/src/lib.rs

541 lines
19 KiB
Rust

use std::sync::Arc;
use std::time::Duration;
use actix_web::web;
use database::redis::RedisPool;
use modrinth_maxmind::MaxMind;
use queue::{
analytics::AnalyticsQueue, email::EmailQueue, payouts::PayoutsQueue,
session::AuthQueue, socket::ActiveSockets,
};
use sqlx::Postgres;
use tracing::{debug, info, warn};
extern crate clickhouse as clickhouse_crate;
use clickhouse_crate::Client;
use util::cors::default_cors;
use util::gotenberg::GotenbergClient;
use crate::background_task::update_versions;
use crate::database::ReadOnlyPgPool;
use crate::queue::billing::{index_billing, index_subscriptions};
use crate::queue::moderation::AutomatedModerationQueue;
use crate::util::anrok;
use crate::util::archon::ArchonClient;
use crate::util::env::{parse_strings_from_var, parse_var};
use crate::util::ratelimit::{AsyncRateLimiter, GCRAParameters};
use sync::friends::handle_pubsub;
pub mod auth;
pub mod background_task;
pub mod clickhouse;
pub mod database;
pub mod file_hosting;
pub mod models;
pub mod queue;
pub mod routes;
pub mod scheduler;
pub mod search;
pub mod sync;
pub mod util;
pub mod validate;
#[cfg(feature = "test")]
pub mod test;
#[derive(Clone)]
pub struct Pepper {
pub pepper: String,
}
#[derive(Clone)]
pub struct LabrinthConfig {
pub pool: sqlx::Pool<Postgres>,
pub ro_pool: ReadOnlyPgPool,
pub redis_pool: RedisPool,
pub clickhouse: Client,
pub file_host: Arc<dyn file_hosting::FileHost + Send + Sync>,
pub maxmind: web::Data<MaxMind>,
pub scheduler: Arc<scheduler::Scheduler>,
pub ip_salt: Pepper,
pub search_config: search::SearchConfig,
pub session_queue: web::Data<AuthQueue>,
pub payouts_queue: web::Data<PayoutsQueue>,
pub analytics_queue: Arc<AnalyticsQueue>,
pub active_sockets: web::Data<ActiveSockets>,
pub automated_moderation_queue: web::Data<AutomatedModerationQueue>,
pub rate_limiter: web::Data<AsyncRateLimiter>,
pub stripe_client: stripe::Client,
pub anrok_client: anrok::Client,
pub email_queue: web::Data<EmailQueue>,
pub archon_client: web::Data<ArchonClient>,
pub gotenberg_client: GotenbergClient,
}
#[allow(clippy::too_many_arguments)]
pub fn app_setup(
pool: sqlx::Pool<Postgres>,
ro_pool: ReadOnlyPgPool,
redis_pool: RedisPool,
search_config: search::SearchConfig,
clickhouse: &mut Client,
file_host: Arc<dyn file_hosting::FileHost + Send + Sync>,
maxmind: MaxMind,
stripe_client: stripe::Client,
anrok_client: anrok::Client,
email_queue: EmailQueue,
gotenberg_client: GotenbergClient,
enable_background_tasks: bool,
) -> LabrinthConfig {
info!(
"Starting labrinth on {}",
dotenvy::var("BIND_ADDR").unwrap()
);
let automated_moderation_queue =
web::Data::new(AutomatedModerationQueue::default());
{
let automated_moderation_queue_ref = automated_moderation_queue.clone();
let pool_ref = pool.clone();
let redis_pool_ref = redis_pool.clone();
actix_rt::spawn(async move {
automated_moderation_queue_ref
.task(pool_ref, redis_pool_ref)
.await;
});
}
let scheduler = scheduler::Scheduler::new();
let limiter = web::Data::new(AsyncRateLimiter::new(
redis_pool.clone(),
GCRAParameters::new(300, 300),
));
if enable_background_tasks {
// The interval in seconds at which the local database is indexed
// for searching. Defaults to 1 hour if unset.
let local_index_interval = Duration::from_secs(
parse_var("LOCAL_INDEX_INTERVAL").unwrap_or(3600),
);
let pool_ref = pool.clone();
let search_config_ref = search_config.clone();
let redis_pool_ref = redis_pool.clone();
scheduler.run(local_index_interval, move || {
let pool_ref = pool_ref.clone();
let redis_pool_ref = redis_pool_ref.clone();
let search_config_ref = search_config_ref.clone();
async move {
background_task::index_search(
pool_ref,
redis_pool_ref,
search_config_ref,
)
.await;
}
});
// Changes statuses of scheduled projects/versions
let pool_ref = pool.clone();
// TODO: Clear cache when these are run
scheduler.run(Duration::from_secs(60 * 5), move || {
let pool_ref = pool_ref.clone();
async move {
background_task::release_scheduled(pool_ref).await;
}
});
let version_index_interval = Duration::from_secs(
parse_var("VERSION_INDEX_INTERVAL").unwrap_or(1800),
);
let pool_ref = pool.clone();
let redis_pool_ref = redis_pool.clone();
scheduler.run(version_index_interval, move || {
let pool_ref = pool_ref.clone();
let redis = redis_pool_ref.clone();
async move {
update_versions(pool_ref, redis).await;
}
});
let pool_ref = pool.clone();
let client_ref = clickhouse.clone();
let redis_pool_ref = redis_pool.clone();
scheduler.run(Duration::from_secs(60 * 60 * 6), move || {
let pool_ref = pool_ref.clone();
let client_ref = client_ref.clone();
let redis_ref = redis_pool_ref.clone();
async move {
background_task::payouts(pool_ref, client_ref, redis_ref).await;
}
});
let pool_ref = pool.clone();
let redis_ref = redis_pool.clone();
let stripe_client_ref = stripe_client.clone();
let anrok_client_ref = anrok_client.clone();
actix_rt::spawn(async move {
loop {
index_billing(
stripe_client_ref.clone(),
anrok_client_ref.clone(),
pool_ref.clone(),
redis_ref.clone(),
)
.await;
tokio::time::sleep(Duration::from_secs(60 * 5)).await;
}
});
let pool_ref = pool.clone();
let redis_ref = redis_pool.clone();
let stripe_client_ref = stripe_client.clone();
let anrok_client_ref = anrok_client.clone();
actix_rt::spawn(async move {
loop {
index_subscriptions(
pool_ref.clone(),
redis_ref.clone(),
stripe_client_ref.clone(),
anrok_client_ref.clone(),
)
.await;
tokio::time::sleep(Duration::from_secs(60 * 5)).await;
}
});
}
let session_queue = web::Data::new(AuthQueue::new());
let pool_ref = pool.clone();
let redis_ref = redis_pool.clone();
let session_queue_ref = session_queue.clone();
scheduler.run(Duration::from_secs(60 * 30), move || {
let pool_ref = pool_ref.clone();
let redis_ref = redis_ref.clone();
let session_queue_ref = session_queue_ref.clone();
async move {
info!("Indexing sessions queue");
let result = session_queue_ref.index(&pool_ref, &redis_ref).await;
if let Err(e) = result {
warn!("Indexing sessions queue failed: {:?}", e);
}
info!("Done indexing sessions queue");
}
});
let analytics_queue = Arc::new(AnalyticsQueue::new());
{
let client_ref = clickhouse.clone();
let analytics_queue_ref = analytics_queue.clone();
let pool_ref = pool.clone();
let redis_ref = redis_pool.clone();
scheduler.run(Duration::from_secs(15), move || {
let client_ref = client_ref.clone();
let analytics_queue_ref = analytics_queue_ref.clone();
let pool_ref = pool_ref.clone();
let redis_ref = redis_ref.clone();
async move {
debug!("Indexing analytics queue");
let result = analytics_queue_ref
.index(client_ref, &redis_ref, &pool_ref)
.await;
if let Err(e) = result {
warn!("Indexing analytics queue failed: {:?}", e);
}
debug!("Done indexing analytics queue");
}
});
}
let ip_salt = Pepper {
pepper: ariadne::ids::Base62Id(ariadne::ids::random_base62(11))
.to_string(),
};
let active_sockets = web::Data::new(ActiveSockets::default());
{
let pool = pool.clone();
let redis_client = redis::Client::open(redis_pool.url.clone()).unwrap();
let sockets = active_sockets.clone();
actix_rt::spawn(async move {
let pubsub = redis_client.get_async_pubsub().await.unwrap();
handle_pubsub(pubsub, pool, sockets).await;
});
}
LabrinthConfig {
pool,
ro_pool,
redis_pool,
clickhouse: clickhouse.clone(),
file_host,
maxmind: web::Data::new(maxmind),
scheduler: Arc::new(scheduler),
ip_salt,
search_config,
session_queue,
payouts_queue: web::Data::new(PayoutsQueue::new()),
analytics_queue,
active_sockets,
automated_moderation_queue,
rate_limiter: limiter,
stripe_client,
anrok_client,
gotenberg_client,
archon_client: web::Data::new(
ArchonClient::from_env()
.expect("ARCHON_URL and PYRO_API_KEY must be set"),
),
email_queue: web::Data::new(email_queue),
}
}
pub fn app_config(
cfg: &mut web::ServiceConfig,
labrinth_config: LabrinthConfig,
) {
cfg.app_data(web::FormConfig::default().error_handler(|err, _req| {
routes::ApiError::Validation(err.to_string()).into()
}))
.app_data(web::PathConfig::default().error_handler(|err, _req| {
routes::ApiError::Validation(err.to_string()).into()
}))
.app_data(web::QueryConfig::default().error_handler(|err, _req| {
routes::ApiError::Validation(err.to_string()).into()
}))
.app_data(web::JsonConfig::default().error_handler(|err, _req| {
routes::ApiError::Validation(err.to_string()).into()
}))
.app_data(web::Data::new(labrinth_config.redis_pool.clone()))
.app_data(web::Data::new(labrinth_config.pool.clone()))
.app_data(web::Data::new(labrinth_config.ro_pool.clone()))
.app_data(web::Data::new(labrinth_config.file_host.clone()))
.app_data(web::Data::new(labrinth_config.search_config.clone()))
.app_data(web::Data::new(labrinth_config.gotenberg_client.clone()))
.app_data(labrinth_config.session_queue.clone())
.app_data(labrinth_config.payouts_queue.clone())
.app_data(labrinth_config.email_queue.clone())
.app_data(labrinth_config.maxmind.clone())
.app_data(web::Data::new(labrinth_config.ip_salt.clone()))
.app_data(web::Data::new(labrinth_config.analytics_queue.clone()))
.app_data(web::Data::new(labrinth_config.clickhouse.clone()))
.app_data(labrinth_config.active_sockets.clone())
.app_data(labrinth_config.automated_moderation_queue.clone())
.app_data(labrinth_config.archon_client.clone())
.app_data(web::Data::new(labrinth_config.stripe_client.clone()))
.app_data(web::Data::new(labrinth_config.anrok_client.clone()))
.app_data(labrinth_config.rate_limiter.clone())
.configure({
#[cfg(target_os = "linux")]
{
|cfg| routes::debug::config(cfg)
}
#[cfg(not(target_os = "linux"))]
{
|_cfg| ()
}
})
.configure(routes::v2::config)
.configure(routes::v3::config)
.configure(routes::internal::config)
.configure(routes::root_config)
.default_service(web::get().wrap(default_cors()).to(routes::not_found));
}
pub fn utoipa_app_config(
cfg: &mut utoipa_actix_web::service_config::ServiceConfig,
_labrinth_config: LabrinthConfig,
) {
cfg.configure(routes::v3::utoipa_config)
.configure(routes::internal::utoipa_config);
}
// This is so that env vars not used immediately don't panic at runtime
pub fn check_env_vars() -> bool {
let mut failed = false;
fn check_var<T: std::str::FromStr>(var: &str) -> bool {
let check = parse_var::<T>(var).is_none();
if check {
warn!(
"Variable `{}` missing in dotenv or not of type `{}`",
var,
std::any::type_name::<T>()
);
}
check
}
failed |= check_var::<String>("SITE_URL");
failed |= check_var::<String>("CDN_URL");
failed |= check_var::<String>("LABRINTH_ADMIN_KEY");
failed |= check_var::<String>("LABRINTH_EXTERNAL_NOTIFICATION_KEY");
failed |= check_var::<String>("RATE_LIMIT_IGNORE_KEY");
failed |= check_var::<String>("DATABASE_URL");
failed |= check_var::<String>("MEILISEARCH_ADDR");
failed |= check_var::<String>("MEILISEARCH_KEY");
failed |= check_var::<String>("REDIS_URL");
failed |= check_var::<String>("BIND_ADDR");
failed |= check_var::<String>("SELF_ADDR");
failed |= check_var::<String>("STORAGE_BACKEND");
let storage_backend = dotenvy::var("STORAGE_BACKEND").ok();
match storage_backend.as_deref() {
Some("s3") => {
let mut check_var_set = |var_prefix| {
failed |= check_var::<String>(&format!(
"S3_{var_prefix}_BUCKET_NAME"
));
failed |= check_var::<bool>(&format!(
"S3_{var_prefix}_USES_PATH_STYLE_BUCKET"
));
failed |=
check_var::<String>(&format!("S3_{var_prefix}_REGION"));
failed |= check_var::<String>(&format!("S3_{var_prefix}_URL"));
failed |= check_var::<String>(&format!(
"S3_{var_prefix}_ACCESS_TOKEN"
));
failed |=
check_var::<String>(&format!("S3_{var_prefix}_SECRET"));
};
check_var_set("PUBLIC");
check_var_set("PRIVATE");
}
Some("local") => {
failed |= check_var::<String>("MOCK_FILE_PATH");
}
Some(backend) => {
warn!(
"Variable `STORAGE_BACKEND` contains an invalid value: {backend}. Expected \"s3\" or \"local\"."
);
failed |= true;
}
_ => {
warn!("Variable `STORAGE_BACKEND` is not set!");
failed |= true;
}
}
failed |= check_var::<usize>("LOCAL_INDEX_INTERVAL");
failed |= check_var::<usize>("VERSION_INDEX_INTERVAL");
if parse_strings_from_var("WHITELISTED_MODPACK_DOMAINS").is_none() {
warn!(
"Variable `WHITELISTED_MODPACK_DOMAINS` missing in dotenv or not a json array of strings"
);
failed |= true;
}
if parse_strings_from_var("ALLOWED_CALLBACK_URLS").is_none() {
warn!(
"Variable `ALLOWED_CALLBACK_URLS` missing in dotenv or not a json array of strings"
);
failed |= true;
}
failed |= check_var::<String>("GITHUB_CLIENT_ID");
failed |= check_var::<String>("GITHUB_CLIENT_SECRET");
failed |= check_var::<String>("GITLAB_CLIENT_ID");
failed |= check_var::<String>("GITLAB_CLIENT_SECRET");
failed |= check_var::<String>("DISCORD_CLIENT_ID");
failed |= check_var::<String>("DISCORD_CLIENT_SECRET");
failed |= check_var::<String>("MICROSOFT_CLIENT_ID");
failed |= check_var::<String>("MICROSOFT_CLIENT_SECRET");
failed |= check_var::<String>("GOOGLE_CLIENT_ID");
failed |= check_var::<String>("GOOGLE_CLIENT_SECRET");
failed |= check_var::<String>("STEAM_API_KEY");
failed |= check_var::<String>("TREMENDOUS_API_URL");
failed |= check_var::<String>("TREMENDOUS_API_KEY");
failed |= check_var::<String>("TREMENDOUS_PRIVATE_KEY");
failed |= check_var::<String>("PAYPAL_API_URL");
failed |= check_var::<String>("PAYPAL_WEBHOOK_ID");
failed |= check_var::<String>("PAYPAL_CLIENT_ID");
failed |= check_var::<String>("PAYPAL_CLIENT_SECRET");
failed |= check_var::<String>("PAYPAL_NVP_USERNAME");
failed |= check_var::<String>("PAYPAL_NVP_PASSWORD");
failed |= check_var::<String>("PAYPAL_NVP_SIGNATURE");
failed |= check_var::<String>("HCAPTCHA_SECRET");
failed |= check_var::<String>("SMTP_USERNAME");
failed |= check_var::<String>("SMTP_PASSWORD");
failed |= check_var::<String>("SMTP_HOST");
failed |= check_var::<u16>("SMTP_PORT");
failed |= check_var::<String>("SMTP_TLS");
failed |= check_var::<String>("SMTP_FROM_NAME");
failed |= check_var::<String>("SMTP_FROM_ADDRESS");
failed |= check_var::<String>("SITE_VERIFY_EMAIL_PATH");
failed |= check_var::<String>("SITE_RESET_PASSWORD_PATH");
failed |= check_var::<String>("SITE_BILLING_PATH");
failed |= check_var::<String>("SENDY_URL");
failed |= check_var::<String>("SENDY_LIST_ID");
failed |= check_var::<String>("SENDY_API_KEY");
if parse_strings_from_var("ANALYTICS_ALLOWED_ORIGINS").is_none() {
warn!(
"Variable `ANALYTICS_ALLOWED_ORIGINS` missing in dotenv or not a json array of strings"
);
failed |= true;
}
failed |= check_var::<bool>("CLICKHOUSE_REPLICATED");
failed |= check_var::<String>("CLICKHOUSE_URL");
failed |= check_var::<String>("CLICKHOUSE_USER");
failed |= check_var::<String>("CLICKHOUSE_PASSWORD");
failed |= check_var::<String>("CLICKHOUSE_DATABASE");
failed |= check_var::<String>("MAXMIND_ACCOUNT_ID");
failed |= check_var::<String>("MAXMIND_LICENSE_KEY");
failed |= check_var::<String>("FLAME_ANVIL_URL");
failed |= check_var::<String>("GOTENBERG_URL");
failed |= check_var::<String>("GOTENBERG_CALLBACK_BASE");
failed |= check_var::<String>("GOTENBERG_TIMEOUT");
failed |= check_var::<String>("STRIPE_API_KEY");
failed |= check_var::<String>("STRIPE_WEBHOOK_SECRET");
failed |= check_var::<String>("ADITUDE_API_KEY");
failed |= check_var::<String>("PYRO_API_KEY");
failed |= check_var::<String>("BREX_API_URL");
failed |= check_var::<String>("BREX_API_KEY");
failed |= check_var::<String>("DELPHI_URL");
failed |= check_var::<String>("AVALARA_1099_API_URL");
failed |= check_var::<String>("AVALARA_1099_API_KEY");
failed |= check_var::<String>("AVALARA_1099_API_TEAM_ID");
failed |= check_var::<String>("AVALARA_1099_COMPANY_ID");
failed |= check_var::<String>("ANROK_API_URL");
failed |= check_var::<String>("ANROK_API_KEY");
failed |= check_var::<String>("COMPLIANCE_PAYOUT_THRESHOLD");
failed |= check_var::<String>("PAYOUT_ALERT_SLACK_WEBHOOK");
failed |= check_var::<String>("ARCHON_URL");
failed |= check_var::<String>("MURALPAY_API_URL");
failed |= check_var::<String>("MURALPAY_API_KEY");
failed |= check_var::<String>("MURALPAY_TRANSFER_API_KEY");
failed |= check_var::<String>("MURALPAY_SOURCE_ACCOUNT_ID");
failed
}