diff --git a/apps/labrinth/.sqlx/query-dbdf1cce30709c3e1066d0a9156e12ce9e4773e3678da6f10f459a26bd0f3931.json b/apps/labrinth/.sqlx/query-dbdf1cce30709c3e1066d0a9156e12ce9e4773e3678da6f10f459a26bd0f3931.json new file mode 100644 index 00000000..d449d355 --- /dev/null +++ b/apps/labrinth/.sqlx/query-dbdf1cce30709c3e1066d0a9156e12ce9e4773e3678da6f10f459a26bd0f3931.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO payout_sources_balance (account_type, amount, pending, recorded)\n SELECT * FROM UNNEST ($1::text[], $2::numeric[], $3::boolean[], $4::timestamptz[])\n ON CONFLICT (recorded, account_type, pending)\n DO UPDATE SET amount = EXCLUDED.amount\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "TextArray", + "NumericArray", + "BoolArray", + "TimestamptzArray" + ] + }, + "nullable": [] + }, + "hash": "dbdf1cce30709c3e1066d0a9156e12ce9e4773e3678da6f10f459a26bd0f3931" +} diff --git a/apps/labrinth/migrations/20250628213541_payout-sources-recording.sql b/apps/labrinth/migrations/20250628213541_payout-sources-recording.sql new file mode 100644 index 00000000..b22e7595 --- /dev/null +++ b/apps/labrinth/migrations/20250628213541_payout-sources-recording.sql @@ -0,0 +1,8 @@ +-- Add migration script here +CREATE TABLE payout_sources_balance ( + account_type TEXT NOT NULL, + amount numeric(40, 20) NOT NULL, + pending BOOLEAN NOT NULL, + recorded timestamptz NOT NULL DEFAULT now(), + PRIMARY KEY (recorded, account_type, pending) +); diff --git a/apps/labrinth/src/lib.rs b/apps/labrinth/src/lib.rs index 4318934e..24bfcc93 100644 --- a/apps/labrinth/src/lib.rs +++ b/apps/labrinth/src/lib.rs @@ -16,6 +16,7 @@ use util::cors::default_cors; use crate::background_task::update_versions; use crate::queue::moderation::AutomatedModerationQueue; +use crate::queue::payouts::insert_bank_balances; use crate::util::env::{parse_strings_from_var, parse_var}; use crate::util::ratelimit::{AsyncRateLimiter, GCRAParameters}; use sync::friends::handle_pubsub; @@ -252,6 +253,23 @@ pub fn app_setup( }; let payouts_queue = web::Data::new(PayoutsQueue::new()); + + let payouts_queue_ref = payouts_queue.clone(); + let pool_ref = pool.clone(); + scheduler.run(Duration::from_secs(60 * 60 * 6), move || { + let payouts_queue_ref = payouts_queue_ref.clone(); + let pool_ref = pool_ref.clone(); + async move { + info!("Started updating bank balances"); + let result = + insert_bank_balances(&payouts_queue_ref, &pool_ref).await; + if let Err(e) = result { + warn!("Bank balance update failed: {:?}", e); + } + info!("Done updating bank balances"); + } + }); + let active_sockets = web::Data::new(ActiveSockets::default()); { diff --git a/apps/labrinth/src/queue/payouts.rs b/apps/labrinth/src/queue/payouts.rs index b2742b3b..47445939 100644 --- a/apps/labrinth/src/queue/payouts.rs +++ b/apps/labrinth/src/queue/payouts.rs @@ -5,7 +5,7 @@ use crate::models::payouts::{ use crate::models::projects::MonetizationStatus; use crate::routes::ApiError; use base64::Engine; -use chrono::{DateTime, Datelike, Duration, TimeZone, Utc}; +use chrono::{DateTime, Datelike, Duration, NaiveTime, TimeZone, Utc}; use dashmap::DashMap; use futures::TryStreamExt; use reqwest::Method; @@ -1072,3 +1072,61 @@ pub async fn insert_payouts( .execute(&mut **transaction) .await } + +pub async fn insert_bank_balances( + payouts: &PayoutsQueue, + pool: &PgPool, +) -> Result<(), ApiError> { + let mut transaction = pool.begin().await?; + + let (paypal, brex, tremendous) = futures::future::try_join3( + PayoutsQueue::get_paypal_balance(), + PayoutsQueue::get_brex_balance(), + payouts.get_tremendous_balance(), + ) + .await?; + + let mut insert_account_types = Vec::new(); + let mut insert_amounts = Vec::new(); + let mut insert_pending = Vec::new(); + let mut insert_recorded = Vec::new(); + + let now = Utc::now(); + let today = now.date_naive().and_time(NaiveTime::MIN).and_utc(); + + let mut add_balance = + |account_type: &str, balance: Option| { + if let Some(balance) = balance { + insert_account_types.push(account_type.to_string()); + insert_amounts.push(balance.available); + insert_pending.push(false); + insert_recorded.push(today); + + insert_account_types.push(account_type.to_string()); + insert_amounts.push(balance.pending); + insert_pending.push(true); + insert_recorded.push(today); + } + }; + + add_balance("paypal", paypal); + add_balance("brex", brex); + add_balance("tremendous", tremendous); + + sqlx::query!( + " + INSERT INTO payout_sources_balance (account_type, amount, pending, recorded) + SELECT * FROM UNNEST ($1::text[], $2::numeric[], $3::boolean[], $4::timestamptz[]) + ON CONFLICT (recorded, account_type, pending) + DO UPDATE SET amount = EXCLUDED.amount + ", + &insert_account_types[..], + &insert_amounts[..], + &insert_pending[..], + &insert_recorded[..], + ) + .execute(&mut *transaction) + .await?; + + Ok(()) +} diff --git a/apps/labrinth/src/routes/internal/admin.rs b/apps/labrinth/src/routes/internal/admin.rs index 0833b650..3be7e013 100644 --- a/apps/labrinth/src/routes/internal/admin.rs +++ b/apps/labrinth/src/routes/internal/admin.rs @@ -8,13 +8,12 @@ use crate::models::threads::MessageBody; use crate::queue::analytics::AnalyticsQueue; use crate::queue::maxmind::MaxMindIndexer; use crate::queue::moderation::AUTOMOD_ID; -use crate::queue::payouts::PayoutsQueue; use crate::queue::session::AuthQueue; use crate::routes::ApiError; use crate::search::SearchConfig; use crate::util::date::get_current_tenths_of_ms; use crate::util::guards::admin_key_guard; -use actix_web::{HttpRequest, HttpResponse, get, patch, post, web}; +use actix_web::{HttpRequest, HttpResponse, patch, post, web}; use serde::Deserialize; use sqlx::PgPool; use std::collections::HashMap; @@ -28,7 +27,6 @@ pub fn config(cfg: &mut web::ServiceConfig) { web::scope("admin") .service(count_download) .service(force_reindex) - .service(get_balances) .service(delphi_result_ingest), ); } @@ -166,24 +164,6 @@ pub async fn force_reindex( Ok(HttpResponse::NoContent().finish()) } -#[get("/_balances", guard = "admin_key_guard")] -pub async fn get_balances( - payouts: web::Data, -) -> Result { - let (paypal, brex, tremendous) = futures::future::try_join3( - PayoutsQueue::get_paypal_balance(), - PayoutsQueue::get_brex_balance(), - payouts.get_tremendous_balance(), - ) - .await?; - - Ok(HttpResponse::Ok().json(serde_json::json!({ - "paypal": paypal, - "brex": brex, - "tremendous": tremendous, - }))) -} - #[derive(Deserialize)] pub struct DelphiIngest { pub url: String,