* Only update the PaymentMethod ID if not using placeholder ID

* comment

* Create Anrok transactions for all charges

* Fix comment

* Prefer using payment method's address rather than customer address

* chore: query cache, clippy, fmt

* Retrieve stripe address from PM

* chore: query cache, clippy, fmt

* fmt

* bring back the query cache

* Better address retrieval for updating tax amounts, always update tax_last_updated

* chore: query cache, clippy, fmt

* Don't set PM in ctoken interactive session for new PIs
This commit is contained in:
François-Xavier Talbot
2025-09-28 22:13:48 +01:00
committed by GitHub
parent d418eaee12
commit b4eba5a0d5
8 changed files with 209 additions and 55 deletions

View File

@@ -24,6 +24,7 @@ use crate::util::archon::ArchonClient;
use crate::util::archon::{CreateServerRequest, Specs};
use ariadne::ids::base62_impl::to_base62;
use chrono::Utc;
use futures::FutureExt;
use futures::stream::{FuturesUnordered, StreamExt};
use sqlx::PgPool;
use std::collections::HashSet;
@@ -115,7 +116,6 @@ pub async fn index_subscriptions(
let redis_ref = redis.clone();
struct ProcessedCharge {
charge: DBCharge,
new_tax_amount: i64,
product_name: String,
}
@@ -128,7 +128,9 @@ pub async fn index_subscriptions(
let pg = pg_ref.clone();
let redis = redis_ref.clone();
async move {
let charge_clone = charge.clone();
let op_fut = async move {
let tax_id = DBProductsTaxIdentifier::get_price(
charge.price_id,
&pg,
@@ -148,38 +150,6 @@ pub async fn index_subscriptions(
})?;
let stripe_address = 'a: {
let stripe_id: stripe::PaymentIntentId = charge
.payment_platform_id
.as_ref()
.and_then(|x| x.parse().ok())
.ok_or_else(|| {
ApiError::InvalidInput(
"Charge has no payment platform ID"
.to_owned(),
)
})?;
// Attempt retrieving the address via the payment intent's payment method
let pi = stripe::PaymentIntent::retrieve(
&stripe_client,
&stripe_id,
&["payment_method"],
)
.await?;
let pi_stripe_address = pi
.payment_method
.and_then(|x| x.into_object())
.and_then(|x| x.billing_details.address);
match pi_stripe_address {
Some(address) => break 'a address,
None => {
warn!("PaymentMethod had no address");
}
};
let stripe_customer_id =
DBUser::get_id(charge.user_id, &pg, &redis)
.await?
@@ -197,23 +167,48 @@ pub async fn index_subscriptions(
)
},
)
})?
.parse()
.map_err(|_| {
ApiError::InvalidInput(
"User Stripe customer ID was invalid".to_owned(),
)
})?;
let customer_id = stripe_customer_id.parse().map_err(|e| ApiError::InvalidInput(format!("Charge's Stripe customer ID was invalid ({e})")))?;
let customer = stripe::Customer::retrieve(
&stripe_client,
&customer_id,
&[],
&stripe_customer_id,
&["invoice_settings.default_payment_method"],
)
.await?;
customer.address.ok_or_else(|| {
let payment_method = customer
.invoice_settings
.and_then(|x| {
x.default_payment_method.and_then(|x| x.into_object())
})
.ok_or_else(|| {
ApiError::InvalidInput(
"Stripe customer had no address"
.to_owned(),
"Customer has no default payment method!".to_string(),
)
})?
})?;
let stripe_address = payment_method.billing_details.address;
// Attempt the default payment method's address first, then the customer's address.
match stripe_address {
Some(address) => break 'a address,
None => {
warn!("PaymentMethod had no address");
}
};
customer.address.ok_or_else(|| {
ApiError::InvalidInput(
"Couldn't get an address for the Stripe customer"
.to_owned(),
)
})?
};
let customer_address =
@@ -238,28 +233,29 @@ pub async fn index_subscriptions(
Result::<ProcessedCharge, ApiError>::Ok(
ProcessedCharge {
charge,
new_tax_amount: tax_amount,
product_name: product
.name
.unwrap_or_else(|| "Modrinth".to_owned()),
},
)
}
};
op_fut.then(move |res| async move { (charge_clone, res) })
})
.collect::<FuturesUnordered<_>>();
while let Some(result) = futures.next().await {
processed_charges += 1;
match result {
Ok(ProcessedCharge {
let mut charge = match result {
(
mut charge,
new_tax_amount,
product_name,
}) => {
charge.tax_last_updated = Some(Utc::now());
Ok(ProcessedCharge {
new_tax_amount,
product_name,
}),
) => {
if new_tax_amount != charge.tax_amount {
// The price of the subscription has changed, we need to insert a notification
// for this.
@@ -293,12 +289,16 @@ pub async fn index_subscriptions(
charge.tax_amount = new_tax_amount;
}
charge.upsert(&mut txn).await?;
charge
}
Err(error) => {
(charge, Err(error)) => {
error!(%error, "Error indexing tax amount on charge");
charge
}
}
};
charge.tax_last_updated = Some(Utc::now());
charge.upsert(&mut txn).await?;
}
txn.commit().await?;