Credit subscriptions (#4575)

* Implement subscription crediting

* chore: query cache, clippy, fmt

* Improve code, improve query for next open charge

* chore: query cache, clippy, fmt

* Move server ID copy button up

* Node + region crediting

* Make it less ugly

* chore: query cache, clippy, fmt

* Bugfixes

* Fix lint

* Adjust migration

* Adjust migration

* Remove billing change

* Move DEFAULT_CREDIT_EMAIL_MESSAGE to utils.ts

* Lint

* Merge

* bump clickhouse, disable validation

* tombi fmt

* Update cargo lock
This commit is contained in:
François-Xavier Talbot
2025-10-20 18:35:44 +01:00
committed by GitHub
parent 79502a19d6
commit eeed4e572d
22 changed files with 1052 additions and 8 deletions

View File

@@ -233,7 +233,10 @@ impl DBCharge {
) -> Result<Option<DBCharge>, DatabaseError> {
let user_subscription_id = user_subscription_id.0;
let res = select_charges_with_predicate!(
"WHERE subscription_id = $1 AND (status = 'open' OR status = 'expiring' OR status = 'cancelled' OR status = 'failed')",
"WHERE
subscription_id = $1
AND (status = 'open' OR status = 'expiring' OR status = 'cancelled' OR status = 'failed')
ORDER BY due ASC LIMIT 1",
user_subscription_id
)
.fetch_optional(exec)

View File

@@ -35,6 +35,7 @@ pub mod user_subscription_item;
pub mod users_compliance;
pub mod users_notifications_preferences_item;
pub mod users_redeemals;
pub mod users_subscriptions_credits;
pub mod version_item;
pub use affiliate_code_item::DBAffiliateCode;

View File

@@ -160,6 +160,32 @@ impl DBUserSubscription {
Ok(())
}
pub async fn get_many_by_server_ids(
server_ids: &[String],
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<DBUserSubscription>, DatabaseError> {
if server_ids.is_empty() {
return Ok(vec![]);
}
let results = sqlx::query_as!(
UserSubscriptionQueryResult,
r#"
SELECT us.id, us.user_id, us.price_id, us.interval, us.created, us.status, us.metadata
FROM users_subscriptions us
WHERE us.metadata->>'type' = 'pyro' AND us.metadata->>'id' = ANY($1::text[])
"#,
server_ids
)
.fetch_all(exec)
.await?;
Ok(results
.into_iter()
.map(|r| r.try_into())
.collect::<Result<Vec<_>, serde_json::Error>>()?)
}
}
pub struct SubscriptionWithCharge {

View File

@@ -0,0 +1,82 @@
use crate::database::models::{DBUserId, DBUserSubscriptionId};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::query_scalar;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DBUserSubscriptionCredit {
pub id: i32,
pub subscription_id: DBUserSubscriptionId,
pub user_id: DBUserId,
pub creditor_id: DBUserId,
pub days: i32,
pub previous_due: DateTime<Utc>,
pub next_due: DateTime<Utc>,
pub created: DateTime<Utc>,
}
impl DBUserSubscriptionCredit {
/// Inserts this audit entry and sets its id.
pub async fn insert<'a, E>(&mut self, exec: E) -> sqlx::Result<()>
where
E: sqlx::PgExecutor<'a>,
{
let id = query_scalar!(
r#"
INSERT INTO users_subscriptions_credits
(subscription_id, user_id, creditor_id, days, previous_due, next_due)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING id
"#,
self.subscription_id.0,
self.user_id.0,
self.creditor_id.0,
self.days,
self.previous_due,
self.next_due,
)
.fetch_one(exec)
.await?;
self.id = id;
Ok(())
}
pub async fn insert_many(
exec: &mut sqlx::Transaction<'_, sqlx::Postgres>,
subscription_ids: &[DBUserSubscriptionId],
user_ids: &[DBUserId],
creditor_ids: &[DBUserId],
days: &[i32],
previous_dues: &[DateTime<Utc>],
next_dues: &[DateTime<Utc>],
) -> sqlx::Result<()> {
debug_assert_eq!(subscription_ids.len(), user_ids.len());
debug_assert_eq!(user_ids.len(), creditor_ids.len());
debug_assert_eq!(creditor_ids.len(), days.len());
debug_assert_eq!(days.len(), previous_dues.len());
debug_assert_eq!(previous_dues.len(), next_dues.len());
let subs: Vec<i64> = subscription_ids.iter().map(|x| x.0).collect();
let users: Vec<i64> = user_ids.iter().map(|x| x.0).collect();
let creditors: Vec<i64> = creditor_ids.iter().map(|x| x.0).collect();
sqlx::query!(
r#"
INSERT INTO users_subscriptions_credits
(subscription_id, user_id, creditor_id, days, previous_due, next_due)
SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::bigint[], $4::int[], $5::timestamptz[], $6::timestamptz[])
"#,
&subs[..],
&users[..],
&creditors[..],
&days[..],
&previous_dues[..],
&next_dues[..],
)
.execute(&mut **exec)
.await?;
Ok(())
}
}

View File

@@ -21,6 +21,7 @@ use crate::database::ReadOnlyPgPool;
use crate::queue::billing::{index_billing, index_subscriptions};
use crate::queue::moderation::AutomatedModerationQueue;
use crate::util::anrok;
use crate::util::archon::ArchonClient;
use crate::util::env::{parse_strings_from_var, parse_var};
use crate::util::ratelimit::{AsyncRateLimiter, GCRAParameters};
use sync::friends::handle_pubsub;
@@ -64,6 +65,7 @@ pub struct LabrinthConfig {
pub stripe_client: stripe::Client,
pub anrok_client: anrok::Client,
pub email_queue: web::Data<EmailQueue>,
pub archon_client: web::Data<ArchonClient>,
pub gotenberg_client: GotenbergClient,
}
@@ -283,6 +285,10 @@ pub fn app_setup(
stripe_client,
anrok_client,
gotenberg_client,
archon_client: web::Data::new(
ArchonClient::from_env()
.expect("ARCHON_URL and PYRO_API_KEY must be set"),
),
email_queue: web::Data::new(email_queue),
}
}
@@ -315,9 +321,9 @@ pub fn app_config(
.app_data(web::Data::new(labrinth_config.ip_salt.clone()))
.app_data(web::Data::new(labrinth_config.analytics_queue.clone()))
.app_data(web::Data::new(labrinth_config.clickhouse.clone()))
.app_data(labrinth_config.maxmind.clone())
.app_data(labrinth_config.active_sockets.clone())
.app_data(labrinth_config.automated_moderation_queue.clone())
.app_data(labrinth_config.archon_client.clone())
.app_data(web::Data::new(labrinth_config.stripe_client.clone()))
.app_data(web::Data::new(labrinth_config.anrok_client.clone()))
.app_data(labrinth_config.rate_limiter.clone())
@@ -478,7 +484,6 @@ pub fn check_env_vars() -> bool {
failed |= check_var::<String>("CLICKHOUSE_PASSWORD");
failed |= check_var::<String>("CLICKHOUSE_DATABASE");
failed |= check_var::<String>("MAXMIND_ACCOUNT_ID");
failed |= check_var::<String>("MAXMIND_LICENSE_KEY");
failed |= check_var::<String>("FLAME_ANVIL_URL");

View File

@@ -109,6 +109,13 @@ pub enum LegacyNotificationBody {
amount: String,
service: String,
},
SubscriptionCredited {
subscription_id: UserSubscriptionId,
days: i32,
previous_due: DateTime<Utc>,
next_due: DateTime<Utc>,
header_message: Option<String>,
},
PatCreated {
token_name: String,
},
@@ -219,6 +226,9 @@ impl LegacyNotification {
NotificationBody::TaxNotification { .. } => {
Some("tax_notification".to_string())
}
NotificationBody::SubscriptionCredited { .. } => {
Some("subscription_credited".to_string())
}
NotificationBody::PayoutAvailable { .. } => {
Some("payout_available".to_string())
}
@@ -396,6 +406,19 @@ impl LegacyNotification {
NotificationBody::PaymentFailed { amount, service } => {
LegacyNotificationBody::PaymentFailed { amount, service }
}
NotificationBody::SubscriptionCredited {
subscription_id,
days,
previous_due,
next_due,
header_message,
} => LegacyNotificationBody::SubscriptionCredited {
subscription_id,
days,
previous_due,
next_due,
header_message,
},
NotificationBody::Unknown => LegacyNotificationBody::Unknown,
};

View File

@@ -46,6 +46,7 @@ pub enum NotificationType {
PasswordChanged,
PasswordRemoved,
EmailChanged,
SubscriptionCredited,
PaymentFailed,
TaxNotification,
PatCreated,
@@ -78,6 +79,7 @@ impl NotificationType {
NotificationType::PasswordChanged => "password_changed",
NotificationType::PasswordRemoved => "password_removed",
NotificationType::EmailChanged => "email_changed",
NotificationType::SubscriptionCredited => "subscription_credited",
NotificationType::PaymentFailed => "payment_failed",
NotificationType::TaxNotification => "tax_notification",
NotificationType::PatCreated => "pat_created",
@@ -114,6 +116,7 @@ impl NotificationType {
"password_changed" => NotificationType::PasswordChanged,
"password_removed" => NotificationType::PasswordRemoved,
"email_changed" => NotificationType::EmailChanged,
"subscription_credited" => NotificationType::SubscriptionCredited,
"payment_failed" => NotificationType::PaymentFailed,
"tax_notification" => NotificationType::TaxNotification,
"payout_available" => NotificationType::PayoutAvailable,
@@ -220,6 +223,13 @@ pub enum NotificationBody {
new_email: String,
to_email: String,
},
SubscriptionCredited {
subscription_id: UserSubscriptionId,
days: i32,
previous_due: DateTime<Utc>,
next_due: DateTime<Utc>,
header_message: Option<String>,
},
PaymentFailed {
amount: String,
service: String,
@@ -312,6 +322,9 @@ impl NotificationBody {
NotificationBody::EmailChanged { .. } => {
NotificationType::EmailChanged
}
NotificationBody::SubscriptionCredited { .. } => {
NotificationType::SubscriptionCredited
}
NotificationBody::PaymentFailed { .. } => {
NotificationType::PaymentFailed
}
@@ -554,6 +567,12 @@ impl From<DBNotification> for Notification {
"#".to_string(),
vec![],
),
NotificationBody::SubscriptionCredited { .. } => (
"Subscription credited".to_string(),
"Your subscription has been credited with additional service time.".to_string(),
"#".to_string(),
vec![],
),
NotificationBody::PayoutAvailable { .. } => (
"Payout available".to_string(),
"A payout is available!".to_string(),

View File

@@ -42,6 +42,12 @@ const TAXNOTIFICATION_BILLING_INTERVAL: &str =
const TAXNOTIFICATION_DUE: &str = "taxnotification.due";
const TAXNOTIFICATION_SERVICE: &str = "taxnotification.service";
const CREDIT_DAYS: &str = "credit.days_formatted";
const CREDIT_PREVIOUS_DUE: &str = "credit.previous_due";
const CREDIT_NEXT_DUE: &str = "credit.next_due";
const CREDIT_HEADER_MESSAGE: &str = "credit.header_message";
const CREDIT_SUBSCRIPTION_TYPE: &str = "credit.subscription.type";
const PAYMENTFAILED_AMOUNT: &str = "paymentfailed.amount";
const PAYMENTFAILED_SERVICE: &str = "paymentfailed.service";
@@ -676,6 +682,47 @@ async fn collect_template_variables(
Ok(EmailTemplate::Static(map))
}
NotificationBody::SubscriptionCredited {
subscription_id,
days,
previous_due,
next_due,
header_message,
} => {
map.insert(
CREDIT_DAYS,
format!("{days} day{}", if *days == 1 { "" } else { "s" }),
);
map.insert(CREDIT_PREVIOUS_DUE, date_human_readable(*previous_due));
map.insert(CREDIT_NEXT_DUE, date_human_readable(*next_due));
map.insert(SUBSCRIPTION_ID, to_base62(subscription_id.0));
// Only insert header message if provided; frontend sets default fallback
if let Some(h) = header_message.clone() {
map.insert(CREDIT_HEADER_MESSAGE, h);
}
// Derive subscription type label for templates
// Resolve product metadata via price_id join
if let Some(info) = crate::database::models::user_subscription_item::DBUserSubscription::get(
(*subscription_id).into(),
&mut **exec,
)
.await
.ok()
.flatten()
&& let Ok(Some(pinfo)) = crate::database::models::products_tax_identifier_item::product_info_by_product_price_id(info.price_id, &mut **exec).await {
let label = match pinfo.product_metadata {
crate::models::billing::ProductMetadata::Pyro { .. } => "server".to_string(),
crate::models::billing::ProductMetadata::Medal { .. } => "server".to_string(),
crate::models::billing::ProductMetadata::Midas => "Modrinth+".to_string(),
};
map.insert(CREDIT_SUBSCRIPTION_TYPE, label);
}
Ok(EmailTemplate::Static(map))
}
NotificationBody::Custom {
title,
body_md,

View File

@@ -3,6 +3,7 @@ use crate::auth::get_user_from_headers;
use crate::database::models::charge_item::DBCharge;
use crate::database::models::notification_item::NotificationBuilder;
use crate::database::models::products_tax_identifier_item::product_info_by_product_price_id;
use crate::database::models::users_subscriptions_credits::DBUserSubscriptionCredit;
use crate::database::models::{
charge_item, generate_charge_id, product_item, user_subscription_item,
};
@@ -48,6 +49,7 @@ pub fn config(cfg: &mut web::ServiceConfig) {
.service(edit_payment_method)
.service(remove_payment_method)
.service(charges)
.service(credit)
.service(active_servers)
.service(initiate_payment)
.service(stripe_webhook)
@@ -2188,3 +2190,238 @@ pub async fn stripe_webhook(
}
pub mod payments;
#[allow(clippy::too_many_arguments)]
async fn apply_credit_many_in_txn(
transaction: &mut Transaction<'_, Postgres>,
redis: &RedisPool,
current_user_id: crate::database::models::ids::DBUserId,
subscription_ids: Vec<crate::models::ids::UserSubscriptionId>,
days: i32,
send_email: bool,
message: String,
) -> Result<(), ApiError> {
use crate::database::models::ids::DBUserSubscriptionId;
let mut credit_sub_ids: Vec<DBUserSubscriptionId> =
Vec::with_capacity(subscription_ids.len());
let mut credit_user_ids: Vec<crate::database::models::ids::DBUserId> =
Vec::with_capacity(subscription_ids.len());
let mut credit_creditor_ids: Vec<crate::database::models::ids::DBUserId> =
Vec::with_capacity(subscription_ids.len());
let mut credit_days: Vec<i32> = Vec::with_capacity(subscription_ids.len());
let mut credit_prev_dues: Vec<chrono::DateTime<chrono::Utc>> =
Vec::with_capacity(subscription_ids.len());
let mut credit_next_dues: Vec<chrono::DateTime<chrono::Utc>> =
Vec::with_capacity(subscription_ids.len());
let subs_ids: Vec<DBUserSubscriptionId> = subscription_ids
.iter()
.map(|id| DBUserSubscriptionId(id.0 as i64))
.collect();
let subs = user_subscription_item::DBUserSubscription::get_many(
&subs_ids,
&mut **transaction,
)
.await?;
for subscription in subs {
let mut open_charge = charge_item::DBCharge::get_open_subscription(
subscription.id,
&mut **transaction,
)
.await?
.ok_or_else(|| {
ApiError::InvalidInput(format!(
"Could not find open charge for subscription {}",
to_base62(subscription.id.0 as u64)
))
})?;
let previous_due = open_charge.due;
open_charge.due = previous_due + Duration::days(days as i64);
let next_due = open_charge.due;
open_charge.upsert(&mut *transaction).await?;
credit_sub_ids.push(subscription.id);
credit_user_ids.push(subscription.user_id);
credit_creditor_ids.push(current_user_id);
credit_days.push(days);
credit_prev_dues.push(previous_due);
credit_next_dues.push(next_due);
if send_email {
NotificationBuilder {
body: NotificationBody::SubscriptionCredited {
subscription_id: subscription.id.into(),
days,
previous_due,
next_due,
header_message: Some(message.clone()),
},
}
.insert(subscription.user_id, &mut *transaction, redis)
.await?;
}
}
DBUserSubscriptionCredit::insert_many(
&mut *transaction,
&credit_sub_ids,
&credit_user_ids,
&credit_creditor_ids,
&credit_days,
&credit_prev_dues,
&credit_next_dues,
)
.await
.map_err(|e| ApiError::Internal(eyre::eyre!(e)))?;
Ok(())
}
#[derive(Deserialize)]
pub struct CreditRequest {
#[serde(flatten)]
pub target: CreditTarget,
pub days: i32,
pub send_email: bool,
pub message: String,
}
#[derive(Deserialize)]
#[serde(untagged)]
pub enum CreditTarget {
Subscriptions {
subscription_ids: Vec<crate::models::ids::UserSubscriptionId>,
},
Nodes {
nodes: Vec<String>,
},
Region {
region: String,
},
}
#[post("credit")]
pub async fn credit(
req: HttpRequest,
pool: web::Data<PgPool>,
redis: web::Data<RedisPool>,
session_queue: web::Data<AuthQueue>,
archon_client: web::Data<crate::util::archon::ArchonClient>,
body: web::Json<CreditRequest>,
) -> Result<HttpResponse, ApiError> {
let user = get_user_from_headers(
&req,
&**pool,
&redis,
&session_queue,
Scopes::SESSION_ACCESS,
)
.await?
.1;
if !user.role.is_admin() {
return Err(ApiError::CustomAuthentication(
"You do not have permission to credit subscriptions!".to_string(),
));
}
let CreditRequest {
target,
days,
send_email,
message,
} = body.into_inner();
if days <= 0 {
return Err(ApiError::InvalidInput(
"Days must be greater than zero".to_string(),
));
}
let mut transaction = pool.begin().await?;
match target {
CreditTarget::Subscriptions { subscription_ids } => {
if subscription_ids.is_empty() {
return Err(ApiError::InvalidInput(
"You must specify at least one subscription id".to_string(),
));
}
apply_credit_many_in_txn(
&mut transaction,
&redis,
crate::database::models::ids::DBUserId(user.id.0 as i64),
subscription_ids,
days,
send_email,
message,
)
.await?;
}
CreditTarget::Nodes { nodes } => {
if nodes.is_empty() {
return Err(ApiError::InvalidInput(
"You must specify at least one node hostname".to_string(),
));
}
let mut server_ids: Vec<String> = Vec::new();
for hostname in nodes {
let ids =
archon_client.get_servers_by_hostname(&hostname).await?;
server_ids.extend(ids);
}
server_ids.sort();
server_ids.dedup();
let subs = user_subscription_item::DBUserSubscription::get_many_by_server_ids(
&server_ids,
&mut *transaction,
)
.await?;
if subs.is_empty() {
return Err(ApiError::InvalidInput(
"No subscriptions found for provided nodes".to_string(),
));
}
apply_credit_many_in_txn(
&mut transaction,
&redis,
crate::database::models::ids::DBUserId(user.id.0 as i64),
subs.into_iter().map(|s| s.id.into()).collect(),
days,
send_email,
message,
)
.await?;
}
CreditTarget::Region { region } => {
let parsed_active =
archon_client.get_active_servers_by_region(&region).await?;
let subs = user_subscription_item::DBUserSubscription::get_many_by_server_ids(
&parsed_active,
&mut *transaction,
)
.await?;
if subs.is_empty() {
return Err(ApiError::InvalidInput(
"No subscriptions found for provided region".to_string(),
));
}
apply_credit_many_in_txn(
&mut transaction,
&redis,
crate::database::models::ids::DBUserId(user.id.0 as i64),
subs.into_iter().map(|s| s.id.into()).collect(),
days,
send_email,
message,
)
.await?;
}
}
transaction.commit().await?;
Ok(HttpResponse::NoContent().finish())
}

View File

@@ -72,4 +72,58 @@ impl ArchonClient {
Ok(response.json::<CreateServerResponse>().await?.uuid)
}
pub async fn get_servers_by_hostname(
&self,
hostname: &str,
) -> Result<Vec<String>, reqwest::Error> {
#[derive(Deserialize)]
struct NodeByHostnameResponse {
servers: Vec<NodeServerEntry>,
}
#[derive(Deserialize)]
struct NodeServerEntry {
id: String,
#[allow(dead_code)]
available: Option<bool>,
}
let res = self
.client
.get(format!(
"{}/_internal/nodes/by-hostname/{}",
self.base_url, hostname
))
.header(X_MASTER_KEY, &self.pyro_api_key)
.send()
.await?
.error_for_status()?;
let parsed: NodeByHostnameResponse = res.json().await?;
Ok(parsed.servers.into_iter().map(|s| s.id).collect())
}
pub async fn get_active_servers_by_region(
&self,
region: &str,
) -> Result<Vec<String>, reqwest::Error> {
#[derive(Deserialize)]
struct RegionResponse {
active_servers: Vec<String>,
}
let res = self
.client
.get(format!(
"{}/_internal/nodes/regions/{}",
self.base_url, region
))
.header(X_MASTER_KEY, &self.pyro_api_key)
.send()
.await?
.error_for_status()?;
let parsed: RegionResponse = res.json().await?;
Ok(parsed.active_servers)
}
}