Link customer ID to Anrok transaction (#4509)

* Mark transactions with unresolvable addresses as unresolved

* Add customer_name + customer_id to anrok transactions

* Increase rate of Anrok syn

* Remove timer from update_tax_transactions

* chore: query cache, clippy, fmt
This commit is contained in:
François-Xavier Talbot
2025-10-06 17:06:57 +01:00
committed by GitHub
parent dbc64afe48
commit 9589e23118
6 changed files with 97 additions and 51 deletions

View File

@@ -1,6 +1,6 @@
{ {
"db_name": "PostgreSQL", "db_name": "PostgreSQL",
"query": "\n SELECT\n charges.id, charges.user_id, charges.price_id, charges.amount, charges.currency_code, charges.status, charges.due, charges.last_attempt,\n charges.charge_type, charges.subscription_id, charges.tax_amount, charges.tax_platform_id,\n -- Workaround for https://github.com/launchbadge/sqlx/issues/3336\n charges.subscription_interval AS \"subscription_interval?\",\n charges.payment_platform,\n charges.payment_platform_id AS \"payment_platform_id?\",\n charges.parent_charge_id AS \"parent_charge_id?\",\n charges.net AS \"net?\",\n\t\t\t\tcharges.tax_last_updated AS \"tax_last_updated?\",\n\t\t\t\tcharges.tax_drift_loss AS \"tax_drift_loss?\"\n FROM charges\n \n\t\t\tWHERE\n\t\t\t status = 'succeeded'\n\t\t\t AND tax_platform_id IS NULL\n AND payment_platform_id IS NOT NULL\n\t\t\tORDER BY due ASC\n\t\t\tFOR NO KEY UPDATE SKIP LOCKED\n\t\t\tLIMIT $1\n\t\t\t", "query": "\n SELECT\n charges.id, charges.user_id, charges.price_id, charges.amount, charges.currency_code, charges.status, charges.due, charges.last_attempt,\n charges.charge_type, charges.subscription_id, charges.tax_amount, charges.tax_platform_id,\n -- Workaround for https://github.com/launchbadge/sqlx/issues/3336\n charges.subscription_interval AS \"subscription_interval?\",\n charges.payment_platform,\n charges.payment_platform_id AS \"payment_platform_id?\",\n charges.parent_charge_id AS \"parent_charge_id?\",\n charges.net AS \"net?\",\n\t\t\t\tcharges.tax_last_updated AS \"tax_last_updated?\",\n\t\t\t\tcharges.tax_drift_loss AS \"tax_drift_loss?\"\n FROM charges\n \n\t\t\tWHERE\n\t\t\t status = 'succeeded'\n\t\t\t AND tax_platform_id IS NULL\n AND payment_platform_id IS NOT NULL\n\t\t\tORDER BY due ASC\n\t\t\tFOR NO KEY UPDATE SKIP LOCKED\n OFFSET $1\n\t\t\tLIMIT $2\n\t\t\t",
"describe": { "describe": {
"columns": [ "columns": [
{ {
@@ -101,6 +101,7 @@
], ],
"parameters": { "parameters": {
"Left": [ "Left": [
"Int8",
"Int8" "Int8"
] ]
}, },
@@ -126,5 +127,5 @@
true true
] ]
}, },
"hash": "43667ed4b45ca5351276138443870b98a10aa94f4d878a6c8fa4d8ed5d718196" "hash": "050e755134f6d1f09de805ae2cd0f7ca8f6efb96be9f070c43db7fd2049af2d2"
} }

View File

@@ -89,7 +89,7 @@ impl TryFrom<ChargeQueryResult> for DBCharge {
} }
macro_rules! select_charges_with_predicate { macro_rules! select_charges_with_predicate {
($predicate:tt, $param:ident) => { ($predicate:tt $(, $( $param0:expr $(, $param:expr)* $(,)? )?)?) => {
sqlx::query_as!( sqlx::query_as!(
ChargeQueryResult, ChargeQueryResult,
r#" r#"
@@ -107,7 +107,7 @@ macro_rules! select_charges_with_predicate {
FROM charges FROM charges
"# "#
+ $predicate, + $predicate,
$param $( $( $param0, $( $param ),* )? )?
) )
}; };
} }
@@ -344,6 +344,7 @@ impl DBCharge {
/// Charges are locked. /// Charges are locked.
pub async fn get_missing_tax_identifier_lock( pub async fn get_missing_tax_identifier_lock(
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>, exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
offset: i64,
limit: i64, limit: i64,
) -> Result<Vec<DBCharge>, DatabaseError> { ) -> Result<Vec<DBCharge>, DatabaseError> {
let res = select_charges_with_predicate!( let res = select_charges_with_predicate!(
@@ -354,8 +355,10 @@ impl DBCharge {
AND payment_platform_id IS NOT NULL AND payment_platform_id IS NOT NULL
ORDER BY due ASC ORDER BY due ASC
FOR NO KEY UPDATE SKIP LOCKED FOR NO KEY UPDATE SKIP LOCKED
LIMIT $1 OFFSET $1
LIMIT $2
", ",
offset,
limit limit
) )
.fetch_all(exec) .fetch_all(exec)

View File

@@ -36,7 +36,7 @@ use tracing::{debug, error, info, warn};
/// Updates charges which need to have their tax amount updated. This is done within a timer to avoid reaching /// Updates charges which need to have their tax amount updated. This is done within a timer to avoid reaching
/// Anrok API limits. /// Anrok API limits.
/// ///
/// The global rate limit for Anrok API operations is 10 RPS, so we run ~6 requests every second up /// The global rate limit for Anrok API operations is 10 RPS, so we run ~8 requests every second up
/// to the specified limit of processed charges. /// to the specified limit of processed charges.
async fn update_tax_amounts( async fn update_tax_amounts(
pg: &PgPool, pg: &PgPool,
@@ -55,7 +55,7 @@ async fn update_tax_amounts(
let mut txn = pg.begin().await?; let mut txn = pg.begin().await?;
let charges = DBCharge::get_updateable_lock(&mut *txn, 6).await?; let charges = DBCharge::get_updateable_lock(&mut *txn, 8).await?;
if charges.is_empty() { if charges.is_empty() {
info!("No more charges to process"); info!("No more charges to process");
@@ -181,6 +181,8 @@ async fn update_tax_amounts(
tax_id.tax_processor_id, tax_id.tax_processor_id,
charge.amount, charge.amount,
)], )],
customer_id: None,
customer_name: None,
}) })
.await? .await?
.tax_amount_to_collect; .tax_amount_to_collect;
@@ -266,7 +268,7 @@ async fn update_tax_amounts(
/// ///
/// Same as update_tax_amounts, this is done within a timer to avoid reaching Anrok API limits. /// Same as update_tax_amounts, this is done within a timer to avoid reaching Anrok API limits.
/// ///
/// The global rate limit for Anrok API operations is 10 RPS, so we run ~6 requests every second up /// The global rate limit for Anrok API operations is 10 RPS, so we run ~8 requests every second up
/// to the specified limit of processed charges. /// to the specified limit of processed charges.
async fn update_anrok_transactions( async fn update_anrok_transactions(
pg: &PgPool, pg: &PgPool,
@@ -282,7 +284,7 @@ async fn update_anrok_transactions(
anrok_client: &anrok::Client, anrok_client: &anrok::Client,
mut c: DBCharge, mut c: DBCharge,
) -> Result<(), ApiError> { ) -> Result<(), ApiError> {
let (customer_address, tax_platform_id) = 'a: { let (customer_address, tax_platform_id, customer_id) = 'a: {
let (pi, tax_platform_id) = if c.type_ == ChargeType::Refund { let (pi, tax_platform_id) = if c.type_ == ChargeType::Refund {
// the payment_platform_id should be an re or a pyr // the payment_platform_id should be an re or a pyr
@@ -344,16 +346,6 @@ async fn update_anrok_transactions(
.and_then(|x| x.into_object()) .and_then(|x| x.into_object())
.and_then(|x| x.billing_details.address); .and_then(|x| x.billing_details.address);
match pi_stripe_address {
Some(address) => break 'a (address, tax_platform_id),
None => {
warn!(
"A PaymentMethod for '{:?}' has no address; falling back to the customer's address",
pi.customer.map(|x| x.id())
);
}
};
let stripe_customer_id = let stripe_customer_id =
DBUser::get_id(c.user_id, &mut **txn, redis) DBUser::get_id(c.user_id, &mut **txn, redis)
.await? .await?
@@ -376,6 +368,18 @@ async fn update_anrok_transactions(
)) ))
})?; })?;
match pi_stripe_address {
Some(address) => {
break 'a (address, tax_platform_id, customer_id);
}
None => {
warn!(
"A PaymentMethod for '{:?}' has no address; falling back to the customer's address",
pi.customer.map(|x| x.id())
);
}
};
let customer = let customer =
stripe::Customer::retrieve(stripe_client, &customer_id, &[]) stripe::Customer::retrieve(stripe_client, &customer_id, &[])
.await?; .await?;
@@ -386,7 +390,7 @@ async fn update_anrok_transactions(
) )
})?; })?;
(address, tax_platform_id) (address, tax_platform_id, customer_id)
}; };
let tax_id = DBProductsTaxIdentifier::get_price(c.price_id, &mut **txn) let tax_id = DBProductsTaxIdentifier::get_price(c.price_id, &mut **txn)
@@ -396,7 +400,7 @@ async fn update_anrok_transactions(
// Note: if the tax amount that was charged to the customer is *different* than // Note: if the tax amount that was charged to the customer is *different* than
// what it *should* be NOW, we will take on a loss here. // what it *should* be NOW, we will take on a loss here.
let should_have_collected = anrok_client let result = anrok_client
.create_or_update_txn(&anrok::Transaction { .create_or_update_txn(&anrok::Transaction {
id: tax_platform_id.clone(), id: tax_platform_id.clone(),
fields: anrok::TransactionFields { fields: anrok::TransactionFields {
@@ -412,53 +416,72 @@ async fn update_anrok_transactions(
c.tax_amount + c.amount, c.tax_amount + c.amount,
), ),
], ],
customer_id: Some(format!("stripe:cust:{customer_id}")),
customer_name: Some("Customer".to_owned()),
}, },
}) })
.await? .await;
.tax_amount_to_collect;
let drift = should_have_collected - c.tax_amount; match result {
Ok(response) => {
let should_have_collected = response.tax_amount_to_collect;
c.tax_drift_loss = Some(drift); let drift = should_have_collected - c.tax_amount;
c.tax_platform_id = Some(tax_platform_id);
c.upsert(txn).await?;
Ok(()) c.tax_drift_loss = Some(drift);
c.tax_platform_id = Some(tax_platform_id);
c.upsert(txn).await?;
Ok(())
}
Err(error) => {
// This isn't gonna be a fixable error, so mark the transaction as unresolvable.
if error
.is_conflict_and(|x| x == "customerAddressCouldNotResolve")
{
c.tax_platform_id = Some("unresolved".to_owned());
c.upsert(txn).await?;
Ok(())
} else {
Err(error.into())
}
}
}
} }
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut processed_charges = 0; let mut processed_charges = 0;
loop { let mut offset = 0;
interval.tick().await;
loop {
let mut txn = pg.begin().await?; let mut txn = pg.begin().await?;
let charges = let mut charges =
DBCharge::get_missing_tax_identifier_lock(&mut *txn, 6).await?; DBCharge::get_missing_tax_identifier_lock(&mut *txn, offset, 1)
.await?;
if charges.is_empty() { let Some(c) = charges.pop() else {
info!("No more charges to process"); info!("No more charges to process");
break Ok(()); break Ok(());
} };
for c in charges { let charge_id = to_base62(c.id.0 as u64);
processed_charges += 1; let user_id = to_base62(c.user_id.0 as u64);
let charge_id = to_base62(c.id.0 as u64); let result =
let user_id = to_base62(c.user_id.0 as u64); process_charge(stripe_client, &mut txn, redis, anrok_client, c)
.await;
let result = processed_charges += 1;
process_charge(stripe_client, &mut txn, redis, anrok_client, c)
.await;
if let Err(e) = result { if let Err(e) = result {
warn!( warn!(
"Error processing charge '{charge_id}' for user '{user_id}': {e}" "Error processing charge '{charge_id}' for user '{user_id}': {e}"
); );
}
offset += 1;
} }
txn.commit().await?; txn.commit().await?;
@@ -985,14 +1008,14 @@ pub async fn index_subscriptions(
&redis, &redis,
&anrok_client, &anrok_client,
&stripe_client, &stripe_client,
250, 750,
), ),
) )
.await; .await;
run_and_time( run_and_time(
"update_tax_amounts", "update_tax_amounts",
update_tax_amounts(&pool, &redis, &anrok_client, &stripe_client, 100), update_tax_amounts(&pool, &redis, &anrok_client, &stripe_client, 50),
) )
.await; .await;

View File

@@ -290,6 +290,8 @@ pub async fn refund_charge(
accounting_time: Utc::now(), accounting_time: Utc::now(),
accounting_time_zone: anrok::AccountingTimeZone::Utc, accounting_time_zone: anrok::AccountingTimeZone::Utc,
line_items: vec![anrok::LineItem::new_including_tax_amount(tax_id, -refund_amount)], line_items: vec![anrok::LineItem::new_including_tax_amount(tax_id, -refund_amount)],
customer_id: Some(format!("stripe:cust:{}", user.stripe_customer_id.unwrap_or_else(|| "unknown".to_owned()))),
customer_name: Some("Customer".to_owned()),
} }
} }
).await; ).await;

View File

@@ -416,6 +416,8 @@ pub async fn create_or_update_payment_intent(
product_info.tax_identifier.tax_processor_id, product_info.tax_identifier.tax_processor_id,
charge_data.amount, charge_data.amount,
)], )],
customer_id: None,
customer_name: None,
}) })
.await?; .await?;

View File

@@ -109,6 +109,8 @@ pub struct TransactionFields {
pub accounting_time: DateTime<Utc>, pub accounting_time: DateTime<Utc>,
pub accounting_time_zone: AccountingTimeZone, pub accounting_time_zone: AccountingTimeZone,
pub line_items: Vec<LineItem>, pub line_items: Vec<LineItem>,
pub customer_name: Option<String>,
pub customer_id: Option<String>,
} }
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
@@ -131,6 +133,19 @@ pub enum AnrokError {
Other(#[from] reqwest::Error), Other(#[from] reqwest::Error),
} }
impl AnrokError {
pub fn is_conflict_and<F>(&self, pred: F) -> bool
where
F: FnOnce(&str) -> bool,
{
if let AnrokError::Conflict(message) = self {
pred(message)
} else {
false
}
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct Client { pub struct Client {
client: reqwest::Client, client: reqwest::Client,