Changes to handling of refunds in Anrok (#4556)

* Use negations, track transaction version/accounting time, use original charge accounting time in refunds

* query cache

* chore: query cache, clippy, fmt

* Fix tax drift calculation

* Fix migration

* Increase update_tax_transactions rate
This commit is contained in:
François-Xavier Talbot
2025-10-17 16:57:36 +01:00
committed by GitHub
parent b23d3e674f
commit 5db5bf4c4c
18 changed files with 530 additions and 318 deletions

View File

@@ -30,6 +30,8 @@ pub struct DBCharge {
pub tax_amount: i64,
pub tax_platform_id: Option<String>,
pub tax_last_updated: Option<DateTime<Utc>>,
pub tax_transaction_version: Option<i32>,
pub tax_platform_accounting_time: Option<DateTime<Utc>>,
// Net is always in USD
pub net: Option<i64>,
@@ -56,6 +58,8 @@ struct ChargeQueryResult {
tax_last_updated: Option<DateTime<Utc>>,
net: Option<i64>,
tax_drift_loss: Option<i64>,
tax_transaction_version: Option<i32>,
tax_platform_accounting_time: Option<DateTime<Utc>>,
}
impl TryFrom<ChargeQueryResult> for DBCharge {
@@ -84,6 +88,8 @@ impl TryFrom<ChargeQueryResult> for DBCharge {
net: r.net,
tax_last_updated: r.tax_last_updated,
tax_drift_loss: r.tax_drift_loss,
tax_transaction_version: r.tax_transaction_version,
tax_platform_accounting_time: r.tax_platform_accounting_time,
})
}
}
@@ -103,7 +109,9 @@ macro_rules! select_charges_with_predicate {
charges.parent_charge_id AS "parent_charge_id?",
charges.net AS "net?",
charges.tax_last_updated AS "tax_last_updated?",
charges.tax_drift_loss AS "tax_drift_loss?"
charges.tax_drift_loss AS "tax_drift_loss?",
charges.tax_transaction_version AS "tax_transaction_version?",
charges.tax_platform_accounting_time AS "tax_platform_accounting_time?"
FROM charges
"#
+ $predicate,
@@ -119,8 +127,8 @@ impl DBCharge {
) -> Result<DBChargeId, DatabaseError> {
sqlx::query!(
r#"
INSERT INTO charges (id, user_id, price_id, amount, currency_code, charge_type, status, due, last_attempt, subscription_id, subscription_interval, payment_platform, payment_platform_id, parent_charge_id, net, tax_amount, tax_platform_id, tax_last_updated, tax_drift_loss)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19)
INSERT INTO charges (id, user_id, price_id, amount, currency_code, charge_type, status, due, last_attempt, subscription_id, subscription_interval, payment_platform, payment_platform_id, parent_charge_id, net, tax_amount, tax_platform_id, tax_last_updated, tax_drift_loss, tax_transaction_version, tax_platform_accounting_time)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21)
ON CONFLICT (id)
DO UPDATE
SET status = EXCLUDED.status,
@@ -139,7 +147,9 @@ impl DBCharge {
amount = EXCLUDED.amount,
currency_code = EXCLUDED.currency_code,
charge_type = EXCLUDED.charge_type,
tax_drift_loss = EXCLUDED.tax_drift_loss
tax_drift_loss = EXCLUDED.tax_drift_loss,
tax_transaction_version = EXCLUDED.tax_transaction_version,
tax_platform_accounting_time = EXCLUDED.tax_platform_accounting_time
"#,
self.id.0,
self.user_id.0,
@@ -160,6 +170,8 @@ impl DBCharge {
self.tax_platform_id.as_deref(),
self.tax_last_updated,
self.tax_drift_loss,
self.tax_transaction_version,
self.tax_platform_accounting_time,
)
.execute(&mut **transaction)
.await?;

View File

@@ -432,12 +432,17 @@ async fn update_anrok_transactions(
match result {
Ok(response) => {
let should_have_collected = response.tax_amount_to_collect;
let version = response.version.ok_or_else(|| {
ApiError::InvalidInput(
"Anrok response is missing tax transaction version"
.to_owned(),
)
})?;
let drift = should_have_collected - c.tax_amount;
c.tax_drift_loss = Some(drift);
c.tax_drift_loss = Some(response.tax_amount_to_collect);
c.tax_platform_id = Some(tax_platform_id);
c.tax_transaction_version = Some(version);
c.tax_platform_accounting_time = Some(c.due);
c.upsert(txn).await?;
Ok(())
@@ -647,6 +652,8 @@ pub async fn try_process_user_redeemal(
net: None,
tax_last_updated: Some(Utc::now()),
tax_drift_loss: Some(0),
tax_transaction_version: None,
tax_platform_accounting_time: None,
}
.upsert(&mut txn)
.await?;
@@ -1016,7 +1023,7 @@ pub async fn index_subscriptions(
&redis,
&anrok_client,
&stripe_client,
750,
1000,
),
)
.await;

View File

@@ -267,6 +267,24 @@ pub async fn refund_charge(
.tax_identifier
.tax_processor_id;
let Some((
(
original_tax_platform_id,
original_tax_transaction_version,
),
original_tax_platform_accounting_time,
)) = charge
.tax_platform_id
.clone()
.zip(charge.tax_transaction_version)
.zip(charge.tax_platform_accounting_time)
else {
return Err(ApiError::InvalidInput(
"Charge is missing full tax information. Please wait for the original charge to be synchronized with the tax processor."
.to_owned(),
));
};
let refund = stripe::Refund::create(
&stripe_client,
CreateRefund {
@@ -281,13 +299,16 @@ pub async fn refund_charge(
)
.await?;
let anrok_txn_result = anrok_client.create_or_update_txn(
let anrok_txn_result = anrok_client.negate_or_create_partial_negation(
original_tax_platform_id,
original_tax_transaction_version,
charge.amount + charge.tax_amount,
&anrok::Transaction {
id: anrok::transaction_id_stripe_pyr(&refund.id),
fields: anrok::TransactionFields {
customer_address: anrok::Address::from_stripe_address(&billing_address),
currency_code: charge.currency_code.clone(),
accounting_time: Utc::now(),
accounting_time: original_tax_platform_accounting_time,
accounting_time_zone: anrok::AccountingTimeZone::Utc,
line_items: vec![anrok::LineItem::new_including_tax_amount(tax_id, -refund_amount)],
customer_id: Some(format!("stripe:cust:{}", user.stripe_customer_id.unwrap_or_else(|| "unknown".to_owned()))),
@@ -347,6 +368,8 @@ pub async fn refund_charge(
currency_code: charge.currency_code,
tax_last_updated: Some(Utc::now()),
tax_drift_loss: Some(0),
tax_transaction_version: None,
tax_platform_accounting_time: None,
}
.upsert(&mut transaction)
.await?;
@@ -1641,6 +1664,8 @@ pub async fn stripe_webhook(
net: None,
tax_last_updated: Some(Utc::now()),
tax_drift_loss: Some(0),
tax_transaction_version: None,
tax_platform_accounting_time: None,
};
if charge_status != ChargeStatus::Failed {
@@ -2004,6 +2029,8 @@ pub async fn stripe_webhook(
tax_platform_id: None,
tax_last_updated: Some(Utc::now()),
tax_drift_loss: Some(0),
tax_transaction_version: None,
tax_platform_accounting_time: None,
}
.upsert(&mut transaction)
.await?;

View File

@@ -182,6 +182,60 @@ impl Client {
.await
}
pub async fn negate_or_create_partial_negation(
&self,
original_txn_anrok_id: String,
original_txn_version: i32,
original_txn_tax_amount_with_tax: i64,
body: &Transaction,
) -> Result<(), AnrokError> {
let refund_amount = body
.fields
.line_items
.iter()
.map(|l| l.amount_in_smallest_denominations)
.sum::<i64>();
if -refund_amount == original_txn_tax_amount_with_tax {
self.create_full_negation(
original_txn_anrok_id,
original_txn_version,
body.id.clone(),
)
.await?;
} else {
self.create_or_update_txn(body).await?;
}
Ok(())
}
pub async fn create_full_negation(
&self,
original_txn_anrok_id: String,
original_txn_version: i32,
new_txn_id: String,
) -> Result<EmptyResponse, AnrokError> {
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct NegationBody {
original_transaction_id: String,
new_transaction_id: String,
original_transaction_expected_version: i32,
}
self.make_request(
Method::POST,
"/v1/seller/transactions/createNegation",
Some(&NegationBody {
original_transaction_id: original_txn_anrok_id,
new_transaction_id: new_txn_id,
original_transaction_expected_version: original_txn_version,
}),
)
.await
}
pub async fn create_or_update_txn(
&self,
body: &Transaction,