Move update_bank_balances to billing task, don't fail every provider if one fails (#4332)

This commit is contained in:
François-Xavier Talbot
2025-09-03 14:12:34 +01:00
committed by GitHub
parent d23b925bb9
commit bd918c7616
3 changed files with 45 additions and 33 deletions

View File

@@ -1,10 +1,12 @@
use crate::database::redis::RedisPool; use crate::database::redis::RedisPool;
use crate::queue::payouts::process_payout; use crate::queue::payouts::{
PayoutsQueue, insert_bank_balances, process_payout,
};
use crate::search::indexing::index_projects; use crate::search::indexing::index_projects;
use crate::{database, search}; use crate::{database, search};
use clap::ValueEnum; use clap::ValueEnum;
use sqlx::Postgres; use sqlx::Postgres;
use tracing::{info, warn}; use tracing::{error, info, warn};
#[derive(ValueEnum, Debug, Copy, Clone, PartialEq, Eq)] #[derive(ValueEnum, Debug, Copy, Clone, PartialEq, Eq)]
#[clap(rename_all = "kebab_case")] #[clap(rename_all = "kebab_case")]
@@ -37,10 +39,12 @@ impl BackgroundTask {
IndexBilling => { IndexBilling => {
crate::routes::internal::billing::index_billing( crate::routes::internal::billing::index_billing(
stripe_client, stripe_client,
pool, pool.clone(),
redis_pool, redis_pool,
) )
.await .await;
update_bank_balances(pool).await;
} }
IndexSubscriptions => { IndexSubscriptions => {
crate::routes::internal::billing::index_subscriptions( crate::routes::internal::billing::index_subscriptions(
@@ -52,6 +56,15 @@ impl BackgroundTask {
} }
} }
pub async fn update_bank_balances(pool: sqlx::Pool<Postgres>) {
let payouts_queue = PayoutsQueue::new();
match insert_bank_balances(&payouts_queue, &pool).await {
Ok(_) => info!("Bank balances updated successfully"),
Err(error) => error!(%error, "Bank balances update failed"),
}
}
pub async fn run_migrations() { pub async fn run_migrations() {
database::check_for_migrations() database::check_for_migrations()
.await .await

View File

@@ -16,7 +16,6 @@ use util::cors::default_cors;
use crate::background_task::update_versions; use crate::background_task::update_versions;
use crate::queue::moderation::AutomatedModerationQueue; use crate::queue::moderation::AutomatedModerationQueue;
use crate::queue::payouts::insert_bank_balances;
use crate::util::env::{parse_strings_from_var, parse_var}; use crate::util::env::{parse_strings_from_var, parse_var};
use crate::util::ratelimit::{AsyncRateLimiter, GCRAParameters}; use crate::util::ratelimit::{AsyncRateLimiter, GCRAParameters};
use sync::friends::handle_pubsub; use sync::friends::handle_pubsub;
@@ -252,24 +251,6 @@ pub fn app_setup(
.to_string(), .to_string(),
}; };
let payouts_queue = web::Data::new(PayoutsQueue::new());
let payouts_queue_ref = payouts_queue.clone();
let pool_ref = pool.clone();
scheduler.run(Duration::from_secs(60 * 60 * 6), move || {
let payouts_queue_ref = payouts_queue_ref.clone();
let pool_ref = pool_ref.clone();
async move {
info!("Started updating bank balances");
let result =
insert_bank_balances(&payouts_queue_ref, &pool_ref).await;
if let Err(e) = result {
warn!("Bank balance update failed: {:?}", e);
}
info!("Done updating bank balances");
}
});
let active_sockets = web::Data::new(ActiveSockets::default()); let active_sockets = web::Data::new(ActiveSockets::default());
{ {
@@ -292,7 +273,7 @@ pub fn app_setup(
ip_salt, ip_salt,
search_config, search_config,
session_queue, session_queue,
payouts_queue, payouts_queue: web::Data::new(PayoutsQueue::new()),
analytics_queue, analytics_queue,
active_sockets, active_sockets,
automated_moderation_queue, automated_moderation_queue,

View File

@@ -17,6 +17,7 @@ use sqlx::PgPool;
use sqlx::postgres::PgQueryResult; use sqlx::postgres::PgQueryResult;
use std::collections::HashMap; use std::collections::HashMap;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tracing::error;
pub struct PayoutsQueue { pub struct PayoutsQueue {
credential: RwLock<Option<PayPalCredentials>>, credential: RwLock<Option<PayPalCredentials>>,
@@ -1078,12 +1079,23 @@ pub async fn insert_bank_balances(
) -> Result<(), ApiError> { ) -> Result<(), ApiError> {
let mut transaction = pool.begin().await?; let mut transaction = pool.begin().await?;
let (paypal, brex, tremendous) = futures::future::try_join3( let paypal = PayoutsQueue::get_paypal_balance()
PayoutsQueue::get_paypal_balance(), .await
PayoutsQueue::get_brex_balance(), .inspect_err(|error| error!(%error, "Failure getting PayPal balance"))
payouts.get_tremendous_balance(), .ok();
)
.await?; let brex = PayoutsQueue::get_brex_balance()
.await
.inspect_err(|error| error!(%error, "Failure getting Brex balance"))
.ok();
let tremendous = payouts
.get_tremendous_balance()
.await
.inspect_err(
|error| error!(%error, "Failure getting Tremendous balance"),
)
.ok();
let mut insert_account_types = Vec::new(); let mut insert_account_types = Vec::new();
let mut insert_amounts = Vec::new(); let mut insert_amounts = Vec::new();
@@ -1108,9 +1120,15 @@ pub async fn insert_bank_balances(
} }
}; };
add_balance("paypal", paypal); if let Some(paypal) = paypal {
add_balance("brex", brex); add_balance("paypal", paypal);
add_balance("tremendous", tremendous); }
if let Some(brex) = brex {
add_balance("brex", brex);
}
if let Some(tremendous) = tremendous {
add_balance("tremendous", tremendous);
}
sqlx::query!( sqlx::query!(
" "