From bd918c76169985252e7c170b0bd103655925fd20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois-Xavier=20Talbot?= <108630700+fetchfern@users.noreply.github.com> Date: Wed, 3 Sep 2025 14:12:34 +0100 Subject: [PATCH] Move update_bank_balances to billing task, don't fail every provider if one fails (#4332) --- apps/labrinth/src/background_task.rs | 21 ++++++++++++---- apps/labrinth/src/lib.rs | 21 +--------------- apps/labrinth/src/queue/payouts.rs | 36 +++++++++++++++++++++------- 3 files changed, 45 insertions(+), 33 deletions(-) diff --git a/apps/labrinth/src/background_task.rs b/apps/labrinth/src/background_task.rs index 35d8531b..57301ffa 100644 --- a/apps/labrinth/src/background_task.rs +++ b/apps/labrinth/src/background_task.rs @@ -1,10 +1,12 @@ use crate::database::redis::RedisPool; -use crate::queue::payouts::process_payout; +use crate::queue::payouts::{ + PayoutsQueue, insert_bank_balances, process_payout, +}; use crate::search::indexing::index_projects; use crate::{database, search}; use clap::ValueEnum; use sqlx::Postgres; -use tracing::{info, warn}; +use tracing::{error, info, warn}; #[derive(ValueEnum, Debug, Copy, Clone, PartialEq, Eq)] #[clap(rename_all = "kebab_case")] @@ -37,10 +39,12 @@ impl BackgroundTask { IndexBilling => { crate::routes::internal::billing::index_billing( stripe_client, - pool, + pool.clone(), redis_pool, ) - .await + .await; + + update_bank_balances(pool).await; } IndexSubscriptions => { crate::routes::internal::billing::index_subscriptions( @@ -52,6 +56,15 @@ impl BackgroundTask { } } +pub async fn update_bank_balances(pool: sqlx::Pool) { + let payouts_queue = PayoutsQueue::new(); + + match insert_bank_balances(&payouts_queue, &pool).await { + Ok(_) => info!("Bank balances updated successfully"), + Err(error) => error!(%error, "Bank balances update failed"), + } +} + pub async fn run_migrations() { database::check_for_migrations() .await diff --git a/apps/labrinth/src/lib.rs b/apps/labrinth/src/lib.rs index 6fc0e8d8..c5f0b5be 100644 --- a/apps/labrinth/src/lib.rs +++ b/apps/labrinth/src/lib.rs @@ -16,7 +16,6 @@ use util::cors::default_cors; use crate::background_task::update_versions; use crate::queue::moderation::AutomatedModerationQueue; -use crate::queue::payouts::insert_bank_balances; use crate::util::env::{parse_strings_from_var, parse_var}; use crate::util::ratelimit::{AsyncRateLimiter, GCRAParameters}; use sync::friends::handle_pubsub; @@ -252,24 +251,6 @@ pub fn app_setup( .to_string(), }; - let payouts_queue = web::Data::new(PayoutsQueue::new()); - - let payouts_queue_ref = payouts_queue.clone(); - let pool_ref = pool.clone(); - scheduler.run(Duration::from_secs(60 * 60 * 6), move || { - let payouts_queue_ref = payouts_queue_ref.clone(); - let pool_ref = pool_ref.clone(); - async move { - info!("Started updating bank balances"); - let result = - insert_bank_balances(&payouts_queue_ref, &pool_ref).await; - if let Err(e) = result { - warn!("Bank balance update failed: {:?}", e); - } - info!("Done updating bank balances"); - } - }); - let active_sockets = web::Data::new(ActiveSockets::default()); { @@ -292,7 +273,7 @@ pub fn app_setup( ip_salt, search_config, session_queue, - payouts_queue, + payouts_queue: web::Data::new(PayoutsQueue::new()), analytics_queue, active_sockets, automated_moderation_queue, diff --git a/apps/labrinth/src/queue/payouts.rs b/apps/labrinth/src/queue/payouts.rs index ccdeb678..d814bf9c 100644 --- a/apps/labrinth/src/queue/payouts.rs +++ b/apps/labrinth/src/queue/payouts.rs @@ -17,6 +17,7 @@ use sqlx::PgPool; use sqlx::postgres::PgQueryResult; use std::collections::HashMap; use tokio::sync::RwLock; +use tracing::error; pub struct PayoutsQueue { credential: RwLock>, @@ -1078,12 +1079,23 @@ pub async fn insert_bank_balances( ) -> Result<(), ApiError> { let mut transaction = pool.begin().await?; - let (paypal, brex, tremendous) = futures::future::try_join3( - PayoutsQueue::get_paypal_balance(), - PayoutsQueue::get_brex_balance(), - payouts.get_tremendous_balance(), - ) - .await?; + let paypal = PayoutsQueue::get_paypal_balance() + .await + .inspect_err(|error| error!(%error, "Failure getting PayPal balance")) + .ok(); + + let brex = PayoutsQueue::get_brex_balance() + .await + .inspect_err(|error| error!(%error, "Failure getting Brex balance")) + .ok(); + + let tremendous = payouts + .get_tremendous_balance() + .await + .inspect_err( + |error| error!(%error, "Failure getting Tremendous balance"), + ) + .ok(); let mut insert_account_types = Vec::new(); let mut insert_amounts = Vec::new(); @@ -1108,9 +1120,15 @@ pub async fn insert_bank_balances( } }; - add_balance("paypal", paypal); - add_balance("brex", brex); - add_balance("tremendous", tremendous); + if let Some(paypal) = paypal { + add_balance("paypal", paypal); + } + if let Some(brex) = brex { + add_balance("brex", brex); + } + if let Some(tremendous) = tremendous { + add_balance("tremendous", tremendous); + } sqlx::query!( "