You've already forked pages
forked from didirus/AstralRinth
Add bank balances to DB (#3860)
This commit is contained in:
17
apps/labrinth/.sqlx/query-dbdf1cce30709c3e1066d0a9156e12ce9e4773e3678da6f10f459a26bd0f3931.json
generated
Normal file
17
apps/labrinth/.sqlx/query-dbdf1cce30709c3e1066d0a9156e12ce9e4773e3678da6f10f459a26bd0f3931.json
generated
Normal file
@@ -0,0 +1,17 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n INSERT INTO payout_sources_balance (account_type, amount, pending, recorded)\n SELECT * FROM UNNEST ($1::text[], $2::numeric[], $3::boolean[], $4::timestamptz[])\n ON CONFLICT (recorded, account_type, pending)\n DO UPDATE SET amount = EXCLUDED.amount\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"TextArray",
|
||||
"NumericArray",
|
||||
"BoolArray",
|
||||
"TimestamptzArray"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "dbdf1cce30709c3e1066d0a9156e12ce9e4773e3678da6f10f459a26bd0f3931"
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
-- Add migration script here
|
||||
CREATE TABLE payout_sources_balance (
|
||||
account_type TEXT NOT NULL,
|
||||
amount numeric(40, 20) NOT NULL,
|
||||
pending BOOLEAN NOT NULL,
|
||||
recorded timestamptz NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (recorded, account_type, pending)
|
||||
);
|
||||
@@ -16,6 +16,7 @@ use util::cors::default_cors;
|
||||
|
||||
use crate::background_task::update_versions;
|
||||
use crate::queue::moderation::AutomatedModerationQueue;
|
||||
use crate::queue::payouts::insert_bank_balances;
|
||||
use crate::util::env::{parse_strings_from_var, parse_var};
|
||||
use crate::util::ratelimit::{AsyncRateLimiter, GCRAParameters};
|
||||
use sync::friends::handle_pubsub;
|
||||
@@ -252,6 +253,23 @@ pub fn app_setup(
|
||||
};
|
||||
|
||||
let payouts_queue = web::Data::new(PayoutsQueue::new());
|
||||
|
||||
let payouts_queue_ref = payouts_queue.clone();
|
||||
let pool_ref = pool.clone();
|
||||
scheduler.run(Duration::from_secs(60 * 60 * 6), move || {
|
||||
let payouts_queue_ref = payouts_queue_ref.clone();
|
||||
let pool_ref = pool_ref.clone();
|
||||
async move {
|
||||
info!("Started updating bank balances");
|
||||
let result =
|
||||
insert_bank_balances(&payouts_queue_ref, &pool_ref).await;
|
||||
if let Err(e) = result {
|
||||
warn!("Bank balance update failed: {:?}", e);
|
||||
}
|
||||
info!("Done updating bank balances");
|
||||
}
|
||||
});
|
||||
|
||||
let active_sockets = web::Data::new(ActiveSockets::default());
|
||||
|
||||
{
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::models::payouts::{
|
||||
use crate::models::projects::MonetizationStatus;
|
||||
use crate::routes::ApiError;
|
||||
use base64::Engine;
|
||||
use chrono::{DateTime, Datelike, Duration, TimeZone, Utc};
|
||||
use chrono::{DateTime, Datelike, Duration, NaiveTime, TimeZone, Utc};
|
||||
use dashmap::DashMap;
|
||||
use futures::TryStreamExt;
|
||||
use reqwest::Method;
|
||||
@@ -1072,3 +1072,61 @@ pub async fn insert_payouts(
|
||||
.execute(&mut **transaction)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn insert_bank_balances(
|
||||
payouts: &PayoutsQueue,
|
||||
pool: &PgPool,
|
||||
) -> Result<(), ApiError> {
|
||||
let mut transaction = pool.begin().await?;
|
||||
|
||||
let (paypal, brex, tremendous) = futures::future::try_join3(
|
||||
PayoutsQueue::get_paypal_balance(),
|
||||
PayoutsQueue::get_brex_balance(),
|
||||
payouts.get_tremendous_balance(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut insert_account_types = Vec::new();
|
||||
let mut insert_amounts = Vec::new();
|
||||
let mut insert_pending = Vec::new();
|
||||
let mut insert_recorded = Vec::new();
|
||||
|
||||
let now = Utc::now();
|
||||
let today = now.date_naive().and_time(NaiveTime::MIN).and_utc();
|
||||
|
||||
let mut add_balance =
|
||||
|account_type: &str, balance: Option<AccountBalance>| {
|
||||
if let Some(balance) = balance {
|
||||
insert_account_types.push(account_type.to_string());
|
||||
insert_amounts.push(balance.available);
|
||||
insert_pending.push(false);
|
||||
insert_recorded.push(today);
|
||||
|
||||
insert_account_types.push(account_type.to_string());
|
||||
insert_amounts.push(balance.pending);
|
||||
insert_pending.push(true);
|
||||
insert_recorded.push(today);
|
||||
}
|
||||
};
|
||||
|
||||
add_balance("paypal", paypal);
|
||||
add_balance("brex", brex);
|
||||
add_balance("tremendous", tremendous);
|
||||
|
||||
sqlx::query!(
|
||||
"
|
||||
INSERT INTO payout_sources_balance (account_type, amount, pending, recorded)
|
||||
SELECT * FROM UNNEST ($1::text[], $2::numeric[], $3::boolean[], $4::timestamptz[])
|
||||
ON CONFLICT (recorded, account_type, pending)
|
||||
DO UPDATE SET amount = EXCLUDED.amount
|
||||
",
|
||||
&insert_account_types[..],
|
||||
&insert_amounts[..],
|
||||
&insert_pending[..],
|
||||
&insert_recorded[..],
|
||||
)
|
||||
.execute(&mut *transaction)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -8,13 +8,12 @@ use crate::models::threads::MessageBody;
|
||||
use crate::queue::analytics::AnalyticsQueue;
|
||||
use crate::queue::maxmind::MaxMindIndexer;
|
||||
use crate::queue::moderation::AUTOMOD_ID;
|
||||
use crate::queue::payouts::PayoutsQueue;
|
||||
use crate::queue::session::AuthQueue;
|
||||
use crate::routes::ApiError;
|
||||
use crate::search::SearchConfig;
|
||||
use crate::util::date::get_current_tenths_of_ms;
|
||||
use crate::util::guards::admin_key_guard;
|
||||
use actix_web::{HttpRequest, HttpResponse, get, patch, post, web};
|
||||
use actix_web::{HttpRequest, HttpResponse, patch, post, web};
|
||||
use serde::Deserialize;
|
||||
use sqlx::PgPool;
|
||||
use std::collections::HashMap;
|
||||
@@ -28,7 +27,6 @@ pub fn config(cfg: &mut web::ServiceConfig) {
|
||||
web::scope("admin")
|
||||
.service(count_download)
|
||||
.service(force_reindex)
|
||||
.service(get_balances)
|
||||
.service(delphi_result_ingest),
|
||||
);
|
||||
}
|
||||
@@ -166,24 +164,6 @@ pub async fn force_reindex(
|
||||
Ok(HttpResponse::NoContent().finish())
|
||||
}
|
||||
|
||||
#[get("/_balances", guard = "admin_key_guard")]
|
||||
pub async fn get_balances(
|
||||
payouts: web::Data<PayoutsQueue>,
|
||||
) -> Result<HttpResponse, ApiError> {
|
||||
let (paypal, brex, tremendous) = futures::future::try_join3(
|
||||
PayoutsQueue::get_paypal_balance(),
|
||||
PayoutsQueue::get_brex_balance(),
|
||||
payouts.get_tremendous_balance(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(HttpResponse::Ok().json(serde_json::json!({
|
||||
"paypal": paypal,
|
||||
"brex": brex,
|
||||
"tremendous": tremendous,
|
||||
})))
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct DelphiIngest {
|
||||
pub url: String,
|
||||
|
||||
Reference in New Issue
Block a user