diff --git a/apps/labrinth/.sqlx/query-43667ed4b45ca5351276138443870b98a10aa94f4d878a6c8fa4d8ed5d718196.json b/apps/labrinth/.sqlx/query-050e755134f6d1f09de805ae2cd0f7ca8f6efb96be9f070c43db7fd2049af2d2.json similarity index 95% rename from apps/labrinth/.sqlx/query-43667ed4b45ca5351276138443870b98a10aa94f4d878a6c8fa4d8ed5d718196.json rename to apps/labrinth/.sqlx/query-050e755134f6d1f09de805ae2cd0f7ca8f6efb96be9f070c43db7fd2049af2d2.json index e9cc40f0..5779cc0f 100644 --- a/apps/labrinth/.sqlx/query-43667ed4b45ca5351276138443870b98a10aa94f4d878a6c8fa4d8ed5d718196.json +++ b/apps/labrinth/.sqlx/query-050e755134f6d1f09de805ae2cd0f7ca8f6efb96be9f070c43db7fd2049af2d2.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n charges.id, charges.user_id, charges.price_id, charges.amount, charges.currency_code, charges.status, charges.due, charges.last_attempt,\n charges.charge_type, charges.subscription_id, charges.tax_amount, charges.tax_platform_id,\n -- Workaround for https://github.com/launchbadge/sqlx/issues/3336\n charges.subscription_interval AS \"subscription_interval?\",\n charges.payment_platform,\n charges.payment_platform_id AS \"payment_platform_id?\",\n charges.parent_charge_id AS \"parent_charge_id?\",\n charges.net AS \"net?\",\n\t\t\t\tcharges.tax_last_updated AS \"tax_last_updated?\",\n\t\t\t\tcharges.tax_drift_loss AS \"tax_drift_loss?\"\n FROM charges\n \n\t\t\tWHERE\n\t\t\t status = 'succeeded'\n\t\t\t AND tax_platform_id IS NULL\n AND payment_platform_id IS NOT NULL\n\t\t\tORDER BY due ASC\n\t\t\tFOR NO KEY UPDATE SKIP LOCKED\n\t\t\tLIMIT $1\n\t\t\t", + "query": "\n SELECT\n charges.id, charges.user_id, charges.price_id, charges.amount, charges.currency_code, charges.status, charges.due, charges.last_attempt,\n charges.charge_type, charges.subscription_id, charges.tax_amount, charges.tax_platform_id,\n -- Workaround for https://github.com/launchbadge/sqlx/issues/3336\n charges.subscription_interval AS \"subscription_interval?\",\n charges.payment_platform,\n charges.payment_platform_id AS \"payment_platform_id?\",\n charges.parent_charge_id AS \"parent_charge_id?\",\n charges.net AS \"net?\",\n\t\t\t\tcharges.tax_last_updated AS \"tax_last_updated?\",\n\t\t\t\tcharges.tax_drift_loss AS \"tax_drift_loss?\"\n FROM charges\n \n\t\t\tWHERE\n\t\t\t status = 'succeeded'\n\t\t\t AND tax_platform_id IS NULL\n AND payment_platform_id IS NOT NULL\n\t\t\tORDER BY due ASC\n\t\t\tFOR NO KEY UPDATE SKIP LOCKED\n OFFSET $1\n\t\t\tLIMIT $2\n\t\t\t", "describe": { "columns": [ { @@ -101,6 +101,7 @@ ], "parameters": { "Left": [ + "Int8", "Int8" ] }, @@ -126,5 +127,5 @@ true ] }, - "hash": "43667ed4b45ca5351276138443870b98a10aa94f4d878a6c8fa4d8ed5d718196" + "hash": "050e755134f6d1f09de805ae2cd0f7ca8f6efb96be9f070c43db7fd2049af2d2" } diff --git a/apps/labrinth/src/database/models/charge_item.rs b/apps/labrinth/src/database/models/charge_item.rs index d994bc6b..1e7bdc40 100644 --- a/apps/labrinth/src/database/models/charge_item.rs +++ b/apps/labrinth/src/database/models/charge_item.rs @@ -89,7 +89,7 @@ impl TryFrom for DBCharge { } macro_rules! select_charges_with_predicate { - ($predicate:tt, $param:ident) => { + ($predicate:tt $(, $( $param0:expr $(, $param:expr)* $(,)? )?)?) => { sqlx::query_as!( ChargeQueryResult, r#" @@ -107,7 +107,7 @@ macro_rules! select_charges_with_predicate { FROM charges "# + $predicate, - $param + $( $( $param0, $( $param ),* )? )? ) }; } @@ -344,6 +344,7 @@ impl DBCharge { /// Charges are locked. pub async fn get_missing_tax_identifier_lock( exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>, + offset: i64, limit: i64, ) -> Result, DatabaseError> { let res = select_charges_with_predicate!( @@ -354,8 +355,10 @@ impl DBCharge { AND payment_platform_id IS NOT NULL ORDER BY due ASC FOR NO KEY UPDATE SKIP LOCKED - LIMIT $1 + OFFSET $1 + LIMIT $2 ", + offset, limit ) .fetch_all(exec) diff --git a/apps/labrinth/src/queue/billing.rs b/apps/labrinth/src/queue/billing.rs index ae3e474f..b9bbe4e2 100644 --- a/apps/labrinth/src/queue/billing.rs +++ b/apps/labrinth/src/queue/billing.rs @@ -36,7 +36,7 @@ use tracing::{debug, error, info, warn}; /// Updates charges which need to have their tax amount updated. This is done within a timer to avoid reaching /// Anrok API limits. /// -/// The global rate limit for Anrok API operations is 10 RPS, so we run ~6 requests every second up +/// The global rate limit for Anrok API operations is 10 RPS, so we run ~8 requests every second up /// to the specified limit of processed charges. async fn update_tax_amounts( pg: &PgPool, @@ -55,7 +55,7 @@ async fn update_tax_amounts( let mut txn = pg.begin().await?; - let charges = DBCharge::get_updateable_lock(&mut *txn, 6).await?; + let charges = DBCharge::get_updateable_lock(&mut *txn, 8).await?; if charges.is_empty() { info!("No more charges to process"); @@ -181,6 +181,8 @@ async fn update_tax_amounts( tax_id.tax_processor_id, charge.amount, )], + customer_id: None, + customer_name: None, }) .await? .tax_amount_to_collect; @@ -266,7 +268,7 @@ async fn update_tax_amounts( /// /// Same as update_tax_amounts, this is done within a timer to avoid reaching Anrok API limits. /// -/// The global rate limit for Anrok API operations is 10 RPS, so we run ~6 requests every second up +/// The global rate limit for Anrok API operations is 10 RPS, so we run ~8 requests every second up /// to the specified limit of processed charges. async fn update_anrok_transactions( pg: &PgPool, @@ -282,7 +284,7 @@ async fn update_anrok_transactions( anrok_client: &anrok::Client, mut c: DBCharge, ) -> Result<(), ApiError> { - let (customer_address, tax_platform_id) = 'a: { + let (customer_address, tax_platform_id, customer_id) = 'a: { let (pi, tax_platform_id) = if c.type_ == ChargeType::Refund { // the payment_platform_id should be an re or a pyr @@ -344,16 +346,6 @@ async fn update_anrok_transactions( .and_then(|x| x.into_object()) .and_then(|x| x.billing_details.address); - match pi_stripe_address { - Some(address) => break 'a (address, tax_platform_id), - None => { - warn!( - "A PaymentMethod for '{:?}' has no address; falling back to the customer's address", - pi.customer.map(|x| x.id()) - ); - } - }; - let stripe_customer_id = DBUser::get_id(c.user_id, &mut **txn, redis) .await? @@ -376,6 +368,18 @@ async fn update_anrok_transactions( )) })?; + match pi_stripe_address { + Some(address) => { + break 'a (address, tax_platform_id, customer_id); + } + None => { + warn!( + "A PaymentMethod for '{:?}' has no address; falling back to the customer's address", + pi.customer.map(|x| x.id()) + ); + } + }; + let customer = stripe::Customer::retrieve(stripe_client, &customer_id, &[]) .await?; @@ -386,7 +390,7 @@ async fn update_anrok_transactions( ) })?; - (address, tax_platform_id) + (address, tax_platform_id, customer_id) }; let tax_id = DBProductsTaxIdentifier::get_price(c.price_id, &mut **txn) @@ -396,7 +400,7 @@ async fn update_anrok_transactions( // Note: if the tax amount that was charged to the customer is *different* than // what it *should* be NOW, we will take on a loss here. - let should_have_collected = anrok_client + let result = anrok_client .create_or_update_txn(&anrok::Transaction { id: tax_platform_id.clone(), fields: anrok::TransactionFields { @@ -412,53 +416,72 @@ async fn update_anrok_transactions( c.tax_amount + c.amount, ), ], + customer_id: Some(format!("stripe:cust:{customer_id}")), + customer_name: Some("Customer".to_owned()), }, }) - .await? - .tax_amount_to_collect; + .await; - let drift = should_have_collected - c.tax_amount; + match result { + Ok(response) => { + let should_have_collected = response.tax_amount_to_collect; - c.tax_drift_loss = Some(drift); - c.tax_platform_id = Some(tax_platform_id); - c.upsert(txn).await?; + let drift = should_have_collected - c.tax_amount; - Ok(()) + c.tax_drift_loss = Some(drift); + c.tax_platform_id = Some(tax_platform_id); + c.upsert(txn).await?; + + Ok(()) + } + + Err(error) => { + // This isn't gonna be a fixable error, so mark the transaction as unresolvable. + if error + .is_conflict_and(|x| x == "customerAddressCouldNotResolve") + { + c.tax_platform_id = Some("unresolved".to_owned()); + c.upsert(txn).await?; + + Ok(()) + } else { + Err(error.into()) + } + } + } } - let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - let mut processed_charges = 0; - loop { - interval.tick().await; + let mut offset = 0; + loop { let mut txn = pg.begin().await?; - let charges = - DBCharge::get_missing_tax_identifier_lock(&mut *txn, 6).await?; + let mut charges = + DBCharge::get_missing_tax_identifier_lock(&mut *txn, offset, 1) + .await?; - if charges.is_empty() { + let Some(c) = charges.pop() else { info!("No more charges to process"); break Ok(()); - } + }; - for c in charges { - processed_charges += 1; + let charge_id = to_base62(c.id.0 as u64); + let user_id = to_base62(c.user_id.0 as u64); - let charge_id = to_base62(c.id.0 as u64); - let user_id = to_base62(c.user_id.0 as u64); + let result = + process_charge(stripe_client, &mut txn, redis, anrok_client, c) + .await; - let result = - process_charge(stripe_client, &mut txn, redis, anrok_client, c) - .await; + processed_charges += 1; - if let Err(e) = result { - warn!( - "Error processing charge '{charge_id}' for user '{user_id}': {e}" - ); - } + if let Err(e) = result { + warn!( + "Error processing charge '{charge_id}' for user '{user_id}': {e}" + ); + + offset += 1; } txn.commit().await?; @@ -985,14 +1008,14 @@ pub async fn index_subscriptions( &redis, &anrok_client, &stripe_client, - 250, + 750, ), ) .await; run_and_time( "update_tax_amounts", - update_tax_amounts(&pool, &redis, &anrok_client, &stripe_client, 100), + update_tax_amounts(&pool, &redis, &anrok_client, &stripe_client, 50), ) .await; diff --git a/apps/labrinth/src/routes/internal/billing.rs b/apps/labrinth/src/routes/internal/billing.rs index a3c29986..a2e230ca 100644 --- a/apps/labrinth/src/routes/internal/billing.rs +++ b/apps/labrinth/src/routes/internal/billing.rs @@ -290,6 +290,8 @@ pub async fn refund_charge( accounting_time: Utc::now(), accounting_time_zone: anrok::AccountingTimeZone::Utc, line_items: vec![anrok::LineItem::new_including_tax_amount(tax_id, -refund_amount)], + customer_id: Some(format!("stripe:cust:{}", user.stripe_customer_id.unwrap_or_else(|| "unknown".to_owned()))), + customer_name: Some("Customer".to_owned()), } } ).await; diff --git a/apps/labrinth/src/routes/internal/billing/payments.rs b/apps/labrinth/src/routes/internal/billing/payments.rs index 5407e335..8715b2f2 100644 --- a/apps/labrinth/src/routes/internal/billing/payments.rs +++ b/apps/labrinth/src/routes/internal/billing/payments.rs @@ -416,6 +416,8 @@ pub async fn create_or_update_payment_intent( product_info.tax_identifier.tax_processor_id, charge_data.amount, )], + customer_id: None, + customer_name: None, }) .await?; diff --git a/apps/labrinth/src/util/anrok.rs b/apps/labrinth/src/util/anrok.rs index 38d07e6b..e7d896dd 100644 --- a/apps/labrinth/src/util/anrok.rs +++ b/apps/labrinth/src/util/anrok.rs @@ -109,6 +109,8 @@ pub struct TransactionFields { pub accounting_time: DateTime, pub accounting_time_zone: AccountingTimeZone, pub line_items: Vec, + pub customer_name: Option, + pub customer_id: Option, } #[derive(Serialize, Deserialize, Clone, Debug)] @@ -131,6 +133,19 @@ pub enum AnrokError { Other(#[from] reqwest::Error), } +impl AnrokError { + pub fn is_conflict_and(&self, pred: F) -> bool + where + F: FnOnce(&str) -> bool, + { + if let AnrokError::Conflict(message) = self { + pred(message) + } else { + false + } + } +} + #[derive(Clone)] pub struct Client { client: reqwest::Client,