Cleanup + fixes to index_billing/index_subscriptions (#4457)

* Parse refunds

* Cleanup index subscriptions/index billing

* chore: query cache, clippy, fmt
This commit is contained in:
François-Xavier Talbot
2025-10-03 14:01:52 +01:00
committed by GitHub
parent 24504cb94d
commit 7e84659249
2 changed files with 749 additions and 715 deletions

View File

@@ -1,15 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE users\n SET badges = $1\n WHERE (id = $2)\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8",
"Int8"
]
},
"nullable": []
},
"hash": "f2525e9be3b90fc0c42c8333ca795ff0b6eb1d3c4350d8e025d39d927d4547fc"
}

View File

@@ -29,55 +29,9 @@ use futures::stream::{FuturesUnordered, StreamExt};
use sqlx::PgPool; use sqlx::PgPool;
use std::collections::HashSet; use std::collections::HashSet;
use std::str::FromStr; use std::str::FromStr;
use std::time::Instant;
use stripe::{self, Currency}; use stripe::{self, Currency};
use tracing::{error, info, warn}; use tracing::{debug, error, info, warn};
pub async fn index_subscriptions(
pool: PgPool,
redis: RedisPool,
stripe_client: stripe::Client,
anrok_client: anrok::Client,
) {
info!("Indexing subscriptions");
async fn anrok_api_operations(
pool: PgPool,
redis: RedisPool,
stripe_client: stripe::Client,
anrok_client: anrok::Client,
) {
let then = std::time::Instant::now();
let result = update_tax_amounts(
pool.clone(),
redis.clone(),
anrok_client.clone(),
stripe_client.clone(),
100,
)
.await;
if let Err(e) = result {
warn!("Error updating tax amount on charges: {:?}", e);
}
let result = update_tax_transactions(
pool,
redis,
anrok_client,
stripe_client,
100,
)
.await;
if let Err(e) = result {
warn!("Error updating tax transactions: {:?}", e);
}
info!(
"Updating tax amounts and Anrok transactions took {:?}",
then.elapsed()
);
}
/// Updates charges which need to have their tax amount updated. This is done within a timer to avoid reaching /// Updates charges which need to have their tax amount updated. This is done within a timer to avoid reaching
/// Anrok API limits. /// Anrok API limits.
@@ -85,16 +39,14 @@ pub async fn index_subscriptions(
/// The global rate limit for Anrok API operations is 10 RPS, so we run ~6 requests every second up /// The global rate limit for Anrok API operations is 10 RPS, so we run ~6 requests every second up
/// to the specified limit of processed charges. /// to the specified limit of processed charges.
async fn update_tax_amounts( async fn update_tax_amounts(
pg: PgPool, pg: &PgPool,
redis: RedisPool, redis: &RedisPool,
anrok_client: anrok::Client, anrok_client: &anrok::Client,
stripe_client: stripe::Client, stripe_client: &stripe::Client,
limit: i64, limit: i64,
) -> Result<(), ApiError> { ) -> Result<(), ApiError> {
let mut interval = let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
tokio::time::interval(std::time::Duration::from_secs(1)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
interval
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut processed_charges = 0; let mut processed_charges = 0;
@@ -182,6 +134,8 @@ pub async fn index_subscriptions(
) )
.await?; .await?;
// A customer should have a default payment method if they have an active subscription.
let payment_method = customer let payment_method = customer
.invoice_settings .invoice_settings
.and_then(|x| { .and_then(|x| {
@@ -263,8 +217,7 @@ pub async fn index_subscriptions(
let subscription_id = let subscription_id =
charge.subscription_id.ok_or_else(|| { charge.subscription_id.ok_or_else(|| {
ApiError::InvalidInput( ApiError::InvalidInput(
"Charge has no subscription ID" "Charge has no subscription ID".to_owned(),
.to_owned(),
) )
})?; })?;
@@ -283,7 +236,7 @@ pub async fn index_subscriptions(
currency: charge.currency_code.clone(), currency: charge.currency_code.clone(),
}, },
} }
.insert(charge.user_id, &mut txn, &redis) .insert(charge.user_id, &mut txn, redis)
.await?; .await?;
charge.tax_amount = new_tax_amount; charge.tax_amount = new_tax_amount;
@@ -315,17 +268,166 @@ pub async fn index_subscriptions(
/// ///
/// The global rate limit for Anrok API operations is 10 RPS, so we run ~6 requests every second up /// The global rate limit for Anrok API operations is 10 RPS, so we run ~6 requests every second up
/// to the specified limit of processed charges. /// to the specified limit of processed charges.
async fn update_tax_transactions( async fn update_anrok_transactions(
pg: PgPool, pg: &PgPool,
redis: RedisPool, redis: &RedisPool,
anrok_client: anrok::Client, anrok_client: &anrok::Client,
stripe_client: stripe::Client, stripe_client: &stripe::Client,
limit: i64, limit: i64,
) -> Result<(), ApiError> { ) -> Result<(), ApiError> {
let mut interval = async fn process_charge(
tokio::time::interval(std::time::Duration::from_secs(1)); stripe_client: &stripe::Client,
interval txn: &mut sqlx::PgTransaction<'_>,
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); redis: &RedisPool,
anrok_client: &anrok::Client,
mut c: DBCharge,
) -> Result<(), ApiError> {
let (customer_address, tax_platform_id) = 'a: {
let (pi, tax_platform_id) = if c.type_ == ChargeType::Refund {
// the payment_platform_id should be an re or a pyr
let refund_id: stripe::RefundId = c
.payment_platform_id
.as_ref()
.and_then(|x| x.parse().ok())
.ok_or_else(|| {
ApiError::InvalidInput(
"Refund charge has no or an invalid refund ID"
.to_owned(),
)
})?;
let refund = stripe::Refund::retrieve(
stripe_client,
&refund_id,
&["payment_intent.payment_method"],
)
.await?;
let pi = refund
.payment_intent
.and_then(|x| x.into_object())
.ok_or_else(|| {
ApiError::InvalidInput(
"Refund charge has no payment intent".to_owned(),
)
})?;
(pi, anrok::transaction_id_stripe_pyr(&refund_id))
} else {
let stripe_id: stripe::PaymentIntentId = c
.payment_platform_id
.as_ref()
.and_then(|x| x.parse().ok())
.ok_or_else(|| {
ApiError::InvalidInput(
"Charge has no payment platform ID".to_owned(),
)
})?;
// Attempt retrieving the address via the payment intent's payment method
let pi = stripe::PaymentIntent::retrieve(
stripe_client,
&stripe_id,
&["payment_method"],
)
.await?;
let anrok_id = anrok::transaction_id_stripe_pi(&stripe_id);
(pi, anrok_id)
};
let pi_stripe_address = pi
.payment_method
.and_then(|x| x.into_object())
.and_then(|x| x.billing_details.address);
match pi_stripe_address {
Some(address) => break 'a (address, tax_platform_id),
None => {
warn!(
"A PaymentMethod for '{:?}' has no address; falling back to the customer's address",
pi.customer.map(|x| x.id())
);
}
};
let stripe_customer_id =
DBUser::get_id(c.user_id, &mut **txn, redis)
.await?
.ok_or_else(|| {
ApiError::from(DatabaseError::Database(
sqlx::Error::RowNotFound,
))
})
.and_then(|user| {
user.stripe_customer_id.ok_or_else(|| {
ApiError::InvalidInput(
"User has no Stripe customer ID".to_owned(),
)
})
})?;
let customer_id = stripe_customer_id.parse().map_err(|e| {
ApiError::InvalidInput(format!(
"Charge's Stripe customer ID was invalid ({e})"
))
})?;
let customer =
stripe::Customer::retrieve(stripe_client, &customer_id, &[])
.await?;
let address = customer.address.ok_or_else(|| {
ApiError::InvalidInput(
format!("Could not find any address for Stripe customer of user '{}'", to_base62(c.user_id.0 as u64))
)
})?;
(address, tax_platform_id)
};
let tax_id = DBProductsTaxIdentifier::get_price(c.price_id, &mut **txn)
.await?
.ok_or_else(|| DatabaseError::Database(sqlx::Error::RowNotFound))?;
// Note: if the tax amount that was charged to the customer is *different* than
// what it *should* be NOW, we will take on a loss here.
let should_have_collected = anrok_client
.create_or_update_txn(&anrok::Transaction {
id: tax_platform_id.clone(),
fields: anrok::TransactionFields {
customer_address: anrok::Address::from_stripe_address(
&customer_address,
),
currency_code: c.currency_code.clone(),
accounting_time: c.due,
accounting_time_zone: anrok::AccountingTimeZone::Utc,
line_items: vec![
anrok::LineItem::new_including_tax_amount(
tax_id.tax_processor_id,
c.tax_amount + c.amount,
),
],
},
})
.await?
.tax_amount_to_collect;
let drift = should_have_collected - c.tax_amount;
c.tax_drift_loss = Some(drift);
c.tax_platform_id = Some(tax_platform_id);
c.upsert(txn).await?;
Ok(())
}
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut processed_charges = 0; let mut processed_charges = 0;
@@ -342,131 +444,21 @@ pub async fn index_subscriptions(
break Ok(()); break Ok(());
} }
for mut c in charges { for c in charges {
processed_charges += 1; processed_charges += 1;
let payment_intent_id = c let charge_id = to_base62(c.id.0 as u64);
.payment_platform_id let user_id = to_base62(c.user_id.0 as u64);
.as_deref()
.and_then(|x| x.parse().ok())
.ok_or_else(|| {
ApiError::InvalidInput(
"Charge has no or an invalid payment platform ID"
.to_owned(),
)
})?;
let customer_address = 'a: { let result =
let stripe_id: stripe::PaymentIntentId = c process_charge(stripe_client, &mut txn, redis, anrok_client, c)
.payment_platform_id .await;
.as_ref()
.and_then(|x| x.parse().ok())
.ok_or_else(|| {
ApiError::InvalidInput(
"Charge has no payment platform ID".to_owned(),
)
})?;
// Attempt retrieving the address via the payment intent's payment method if let Err(e) = result {
warn!(
let pi = stripe::PaymentIntent::retrieve( "Error processing charge '{charge_id}' for user '{user_id}': {e}"
&stripe_client, );
&stripe_id,
&["payment_method"],
)
.await?;
let pi_stripe_address = pi
.payment_method
.and_then(|x| x.into_object())
.and_then(|x| x.billing_details.address);
match pi_stripe_address {
Some(address) => break 'a address,
None => {
warn!("PaymentMethod had no address");
} }
};
let stripe_customer_id =
DBUser::get_id(c.user_id, &pg, &redis)
.await?
.ok_or_else(|| {
ApiError::from(DatabaseError::Database(
sqlx::Error::RowNotFound,
))
})
.and_then(|user| {
user.stripe_customer_id.ok_or_else(|| {
ApiError::InvalidInput(
"User has no Stripe customer ID"
.to_owned(),
)
})
})?;
let customer_id =
stripe_customer_id.parse().map_err(|e| {
ApiError::InvalidInput(format!(
"Charge's Stripe customer ID was invalid ({e})"
))
})?;
let customer = stripe::Customer::retrieve(
&stripe_client,
&customer_id,
&[],
)
.await?;
customer.address.ok_or_else(|| {
ApiError::InvalidInput(
"Stripe customer had no address".to_owned(),
)
})?
};
let tax_id =
DBProductsTaxIdentifier::get_price(c.price_id, &pg)
.await?
.ok_or_else(|| {
DatabaseError::Database(sqlx::Error::RowNotFound)
})?;
let tax_platform_id =
anrok::transaction_id_stripe_pi(&payment_intent_id);
// Note: if the tax amount that was charged to the customer is *different* than
// what it *should* be NOW, we will take on a loss here.
let should_have_collected = anrok_client
.create_or_update_txn(&anrok::Transaction {
id: tax_platform_id.clone(),
fields: anrok::TransactionFields {
customer_address:
anrok::Address::from_stripe_address(
&customer_address,
),
currency_code: c.currency_code.clone(),
accounting_time: c.due,
accounting_time_zone:
anrok::AccountingTimeZone::Utc,
line_items: vec![
anrok::LineItem::new_including_tax_amount(
tax_id.tax_processor_id,
c.tax_amount + c.amount,
),
],
},
})
.await?
.tax_amount_to_collect;
let drift = should_have_collected - c.tax_amount;
c.tax_drift_loss = Some(drift);
c.tax_platform_id = Some(tax_platform_id);
c.upsert(&mut txn).await?;
} }
txn.commit().await?; txn.commit().await?;
@@ -477,201 +469,6 @@ pub async fn index_subscriptions(
} }
} }
anrok_api_operations(
pool.clone(),
redis.clone(),
stripe_client.clone(),
anrok_client.clone(),
)
.await;
let res = async {
info!("Gathering charges to unprovision");
let mut transaction = pool.begin().await?;
let mut clear_cache_users = Vec::new();
// If an active subscription has:
// - A canceled charge due now
// - An expiring charge due now
// - A failed charge more than two days ago
// It should be unprovisioned.
let all_charges = DBCharge::get_unprovision(&pool).await?;
let mut all_subscriptions =
user_subscription_item::DBUserSubscription::get_many(
&all_charges
.iter()
.filter_map(|x| x.subscription_id)
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>(),
&pool,
)
.await?;
let subscription_prices = product_item::DBProductPrice::get_many(
&all_subscriptions
.iter()
.map(|x| x.price_id)
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>(),
&pool,
)
.await?;
let subscription_products = product_item::DBProduct::get_many(
&subscription_prices
.iter()
.map(|x| x.product_id)
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>(),
&pool,
)
.await?;
let users = DBUser::get_many_ids(
&all_subscriptions
.iter()
.map(|x| x.user_id)
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>(),
&pool,
&redis,
)
.await?;
for charge in all_charges {
info!("Indexing charge '{}'", to_base62(charge.id.0 as u64));
let Some(subscription) = all_subscriptions
.iter_mut()
.find(|x| Some(x.id) == charge.subscription_id)
else {
continue;
};
if subscription.status == SubscriptionStatus::Unprovisioned {
continue;
}
let Some(product_price) = subscription_prices
.iter()
.find(|x| x.id == subscription.price_id)
else {
continue;
};
let Some(product) = subscription_products
.iter()
.find(|x| x.id == product_price.product_id)
else {
continue;
};
let Some(user) =
users.iter().find(|x| x.id == subscription.user_id)
else {
continue;
};
let unprovisioned = match product.metadata {
ProductMetadata::Midas => {
let badges = user.badges - Badges::MIDAS;
sqlx::query!(
"
UPDATE users
SET badges = $1
WHERE (id = $2)
",
badges.bits() as i64,
user.id as DBUserId,
)
.execute(&mut *transaction)
.await?;
true
}
ProductMetadata::Pyro { .. }
| ProductMetadata::Medal { .. } => 'server: {
let server_id = match &subscription.metadata {
Some(SubscriptionMetadata::Pyro { id, region: _ }) => {
id
}
Some(SubscriptionMetadata::Medal { id }) => id,
_ => break 'server true,
};
let res = reqwest::Client::new()
.post(format!(
"{}/modrinth/v0/servers/{}/suspend",
dotenvy::var("ARCHON_URL")?,
server_id
))
.header("X-Master-Key", dotenvy::var("PYRO_API_KEY")?)
.json(&serde_json::json!({
"reason": if charge.status == ChargeStatus::Cancelled || charge.status == ChargeStatus::Expiring {
"cancelled"
} else {
"paymentfailed"
}
}))
.send()
.await;
if let Err(e) = res {
warn!("Error suspending pyro server: {:?}", e);
false
} else {
true
}
}
};
if unprovisioned {
subscription.status = SubscriptionStatus::Unprovisioned;
subscription.upsert(&mut transaction).await?;
}
clear_cache_users.push(user.id);
}
crate::database::models::DBUser::clear_caches(
&clear_cache_users
.into_iter()
.map(|x| (x, None))
.collect::<Vec<_>>(),
&redis,
)
.await?;
transaction.commit().await?;
// If an offer redeemal has been processing for over 5 minutes, it should be set pending.
UserRedeemal::update_stuck_5_minutes(&pool).await?;
// If an offer redeemal is pending, try processing it.
// Try processing it.
let pending_redeemals = UserRedeemal::get_pending(&pool, 100).await?;
for redeemal in pending_redeemals {
if let Err(error) =
try_process_user_redeemal(&pool, &redis, redeemal).await
{
warn!(%error, "Failed to process a redeemal.")
}
}
Ok::<(), ApiError>(())
};
if let Err(e) = res.await {
warn!("Error indexing subscriptions: {:?}", e);
}
info!("Done indexing subscriptions");
}
/// Attempts to process a user redeemal. /// Attempts to process a user redeemal.
/// ///
/// Returns `Ok` if the entry has been succesfully processed, or will not be processed. /// Returns `Ok` if the entry has been succesfully processed, or will not be processed.
@@ -832,16 +629,8 @@ pub async fn try_process_user_redeemal(
Ok(()) Ok(())
} }
pub async fn index_billing( pub async fn cancel_failing_charges(pool: &PgPool) -> Result<(), ApiError> {
stripe_client: stripe::Client, let charges_to_cancel = DBCharge::get_cancellable(pool).await?;
anrok_client: anrok::Client,
pool: PgPool,
redis: RedisPool,
) {
info!("Indexing billing queue");
let res = async {
// If a charge has continuously failed for more than a month, it should be cancelled
let charges_to_cancel = DBCharge::get_cancellable(&pool).await?;
for mut charge in charges_to_cancel { for mut charge in charges_to_cancel {
charge.status = ChargeStatus::Cancelled; charge.status = ChargeStatus::Cancelled;
@@ -851,8 +640,16 @@ pub async fn index_billing(
transaction.commit().await?; transaction.commit().await?;
} }
// If a charge is open and due or has been attempted more than two days ago, it should be processed Ok(())
let charges_to_do = DBCharge::get_chargeable(&pool).await?; }
pub async fn process_chargeable_charges(
pool: &PgPool,
redis: &RedisPool,
stripe_client: &stripe::Client,
anrok_client: &anrok::Client,
) -> Result<(), ApiError> {
let charges_to_do = DBCharge::get_chargeable(pool).await?;
let prices = product_item::DBProductPrice::get_many( let prices = product_item::DBProductPrice::get_many(
&charges_to_do &charges_to_do
@@ -861,7 +658,7 @@ pub async fn index_billing(
.collect::<HashSet<_>>() .collect::<HashSet<_>>()
.into_iter() .into_iter()
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
&pool, pool,
) )
.await?; .await?;
@@ -872,8 +669,8 @@ pub async fn index_billing(
.collect::<HashSet<_>>() .collect::<HashSet<_>>()
.into_iter() .into_iter()
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
&pool, pool,
&redis, redis,
) )
.await?; .await?;
@@ -884,8 +681,7 @@ pub async fn index_billing(
continue; continue;
}; };
let Some(user) = users.iter().find(|x| x.id == charge.user_id) let Some(user) = users.iter().find(|x| x.id == charge.user_id) else {
else {
continue; continue;
}; };
@@ -902,10 +698,10 @@ pub async fn index_billing(
let user = User::from_full(user.clone()); let user = User::from_full(user.clone());
let result = create_or_update_payment_intent( let result = create_or_update_payment_intent(
&pool, pool,
&redis, redis,
&stripe_client, stripe_client,
&anrok_client, anrok_client,
PaymentBootstrapOptions { PaymentBootstrapOptions {
user: &user, user: &user,
payment_intent: None, payment_intent: None,
@@ -913,9 +709,7 @@ pub async fn index_billing(
attached_charge: AttachedCharge::UseExisting { attached_charge: AttachedCharge::UseExisting {
charge: charge.clone(), charge: charge.clone(),
}, },
currency: CurrencyMode::Set( currency: CurrencyMode::Set(currency),
currency,
),
attach_payment_metadata: None, attach_payment_metadata: None,
}, },
) )
@@ -940,7 +734,9 @@ pub async fn index_billing(
charge.tax_amount = tax; charge.tax_amount = tax;
charge.payment_platform = PaymentPlatform::Stripe; charge.payment_platform = PaymentPlatform::Stripe;
} else { } else {
error!("Payment bootstrap succeeded but no payment intent was created"); error!(
"Payment bootstrap succeeded but no payment intent was created"
);
failure = true; failure = true;
} }
} }
@@ -960,13 +756,266 @@ pub async fn index_billing(
transaction.commit().await?; transaction.commit().await?;
} }
Ok::<(), ApiError>(()) Ok(())
} }
async fn unprovision_subscriptions(
pool: &PgPool,
redis: &RedisPool,
) -> Result<(), ApiError> {
info!("Gathering charges to unprovision");
let mut transaction = pool.begin().await?;
let mut clear_cache_users = Vec::new();
// If an active subscription has:
// - A canceled charge due now
// - An expiring charge due now
// - A failed charge more than two days ago
// It should be unprovisioned
let all_charges = DBCharge::get_unprovision(pool).await?;
let mut all_subscriptions =
user_subscription_item::DBUserSubscription::get_many(
&all_charges
.iter()
.filter_map(|x| x.subscription_id)
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>(),
pool,
)
.await?;
let subscription_prices = product_item::DBProductPrice::get_many(
&all_subscriptions
.iter()
.map(|x| x.price_id)
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>(),
pool,
)
.await?;
let subscription_products = product_item::DBProduct::get_many(
&subscription_prices
.iter()
.map(|x| x.product_id)
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>(),
pool,
)
.await?;
let users = DBUser::get_many_ids(
&all_subscriptions
.iter()
.map(|x| x.user_id)
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>(),
pool,
redis,
)
.await?;
for charge in all_charges {
debug!("Unprovisioning charge '{}'", to_base62(charge.id.0 as u64));
let Some(subscription) = all_subscriptions
.iter_mut()
.find(|x| Some(x.id) == charge.subscription_id)
else {
continue;
};
if subscription.status == SubscriptionStatus::Unprovisioned {
continue;
}
let Some(product_price) = subscription_prices
.iter()
.find(|x| x.id == subscription.price_id)
else {
continue;
};
let Some(product) = subscription_products
.iter()
.find(|x| x.id == product_price.product_id)
else {
continue;
};
let Some(user) = users.iter().find(|x| x.id == subscription.user_id)
else {
continue;
};
let unprovisioned = match product.metadata {
ProductMetadata::Midas => {
let badges = user.badges - Badges::MIDAS;
sqlx::query!(
"
UPDATE users
SET badges = $1
WHERE (id = $2)
",
badges.bits() as i64,
user.id as DBUserId,
)
.execute(&mut *transaction)
.await?;
true
}
ProductMetadata::Pyro { .. }
| ProductMetadata::Medal { .. } => 'server: {
let server_id = match &subscription.metadata {
Some(SubscriptionMetadata::Pyro { id, region: _ }) => id,
Some(SubscriptionMetadata::Medal { id }) => id,
_ => break 'server true,
};
let res = reqwest::Client::new()
.post(format!(
"{}/modrinth/v0/servers/{}/suspend",
dotenvy::var("ARCHON_URL")?,
server_id
))
.header("X-Master-Key", dotenvy::var("PYRO_API_KEY")?)
.json(&serde_json::json!({
"reason": if charge.status == ChargeStatus::Cancelled || charge.status == ChargeStatus::Expiring {
"cancelled"
} else {
"paymentfailed"
}
}))
.send()
.await; .await;
if let Err(e) = res { if let Err(e) = res {
warn!("Error indexing billing queue: {:?}", e); warn!("Error suspending pyro server: {:?}", e);
false
} else {
true
} }
}
};
if unprovisioned {
subscription.status = SubscriptionStatus::Unprovisioned;
subscription.upsert(&mut transaction).await?;
}
clear_cache_users.push(user.id);
}
crate::database::models::DBUser::clear_caches(
&clear_cache_users
.into_iter()
.map(|x| (x, None))
.collect::<Vec<_>>(),
redis,
)
.await?;
transaction.commit().await?;
Ok(())
}
async fn process_redeemals(
pool: &PgPool,
redis: &RedisPool,
) -> Result<(), ApiError> {
// If an offer redeemal has been processing for over 5 minutes, it should be set pending.
UserRedeemal::update_stuck_5_minutes(pool).await?;
// If an offer redeemal is pending, try processing it.
// Try processing it.
let pending_redeemals = UserRedeemal::get_pending(pool, 100).await?;
for redeemal in pending_redeemals {
if let Err(error) =
try_process_user_redeemal(pool, redis, redeemal).await
{
warn!(%error, "Failed to process a redeemal.")
}
}
Ok(())
}
pub async fn index_billing(
stripe_client: stripe::Client,
anrok_client: anrok::Client,
pool: PgPool,
redis: RedisPool,
) {
info!("Indexing billing queue");
run_and_time("cancel_failing_charges", cancel_failing_charges(&pool)).await;
run_and_time(
"process_chargeable_charges",
process_chargeable_charges(
&pool,
&redis,
&stripe_client,
&anrok_client,
),
)
.await;
info!("Done indexing billing queue"); info!("Done indexing billing queue");
} }
pub async fn index_subscriptions(
pool: PgPool,
redis: RedisPool,
stripe_client: stripe::Client,
anrok_client: anrok::Client,
) {
info!("Indexing subscriptions");
run_and_time(
"update_anrok_transactions",
update_anrok_transactions(
&pool,
&redis,
&anrok_client,
&stripe_client,
250,
),
)
.await;
run_and_time(
"update_tax_amounts",
update_tax_amounts(&pool, &redis, &anrok_client, &stripe_client, 100),
)
.await;
run_and_time("process_redeemals", process_redeemals(&pool, &redis)).await;
run_and_time(
"unprovision_subscriptions",
unprovision_subscriptions(&pool, &redis),
)
.await;
info!("Done indexing subscriptions");
}
async fn run_and_time<F>(name: &'static str, fut: F)
where
F: Future<Output = Result<(), ApiError>>,
{
let then = Instant::now();
if let Err(error) = fut.await {
error!("Error in '{name}': {error}");
}
info!("Finished '{name}' in {}ms", then.elapsed().as_millis());
}