1
0

Prorations (#975)

* Prorations

* Fix pyro integration

* set server uuid on creation

* fix comp

* Fix new charge date, pyro suspend reason

* Update server creation endpoint
This commit is contained in:
Geometrically
2024-10-14 13:30:04 -07:00
committed by GitHub
parent c88bfbb5f0
commit ff7975773e
6 changed files with 382 additions and 128 deletions

View File

@@ -1,58 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n us.id, us.user_id, us.price_id, us.interval, us.created, us.status, us.metadata\n FROM users_subscriptions us\n \n INNER JOIN charges c\n ON c.subscription_id = us.id\n AND (\n (c.status = 'cancelled' AND c.due < $1) OR\n (c.status = 'failed' AND c.last_attempt < $1 - INTERVAL '2 days')\n )\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Int8"
},
{
"ordinal": 1,
"name": "user_id",
"type_info": "Int8"
},
{
"ordinal": 2,
"name": "price_id",
"type_info": "Int8"
},
{
"ordinal": 3,
"name": "interval",
"type_info": "Text"
},
{
"ordinal": 4,
"name": "created",
"type_info": "Timestamptz"
},
{
"ordinal": 5,
"name": "status",
"type_info": "Varchar"
},
{
"ordinal": 6,
"name": "metadata",
"type_info": "Jsonb"
}
],
"parameters": {
"Left": [
"Timestamptz"
]
},
"nullable": [
false,
false,
false,
false,
false,
false,
true
]
},
"hash": "3cbc34bc326595fc9d070494613fca57628eed279f720565fab55c8d10decd88"
}

View File

@@ -0,0 +1,82 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT id, user_id, price_id, amount, currency_code, status, due, last_attempt, charge_type, subscription_id, subscription_interval\n FROM charges\n WHERE (status = 'cancelled' AND due < $1) OR (status = 'failed' AND last_attempt < $1 - INTERVAL '2 days')",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Int8"
},
{
"ordinal": 1,
"name": "user_id",
"type_info": "Int8"
},
{
"ordinal": 2,
"name": "price_id",
"type_info": "Int8"
},
{
"ordinal": 3,
"name": "amount",
"type_info": "Int8"
},
{
"ordinal": 4,
"name": "currency_code",
"type_info": "Text"
},
{
"ordinal": 5,
"name": "status",
"type_info": "Varchar"
},
{
"ordinal": 6,
"name": "due",
"type_info": "Timestamptz"
},
{
"ordinal": 7,
"name": "last_attempt",
"type_info": "Timestamptz"
},
{
"ordinal": 8,
"name": "charge_type",
"type_info": "Text"
},
{
"ordinal": 9,
"name": "subscription_id",
"type_info": "Int8"
},
{
"ordinal": 10,
"name": "subscription_interval",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Timestamptz"
]
},
"nullable": [
false,
false,
false,
false,
false,
false,
false,
true,
false,
true,
true
]
},
"hash": "a87c913916adf9177f8f38369975d5fc644d989293ccb42c1e06ec54dc2571f8"
}

View File

@@ -162,6 +162,22 @@ impl ChargeItem {
.collect::<Result<Vec<_>, serde_json::Error>>()?)
}
pub async fn get_unprovision(
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<ChargeItem>, DatabaseError> {
let now = Utc::now();
let res =
select_charges_with_predicate!("WHERE (status = 'cancelled' AND due < $1) OR (status = 'failed' AND last_attempt < $1 - INTERVAL '2 days')", now)
.fetch_all(exec)
.await?;
Ok(res
.into_iter()
.map(|r| r.try_into())
.collect::<Result<Vec<_>, serde_json::Error>>()?)
}
pub async fn remove(
id: ChargeId,
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,

View File

@@ -95,30 +95,6 @@ impl UserSubscriptionItem {
.collect::<Result<Vec<_>, serde_json::Error>>()?)
}
pub async fn get_all_unprovision(
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<UserSubscriptionItem>, DatabaseError> {
let now = Utc::now();
let results = select_user_subscriptions_with_predicate!(
"
INNER JOIN charges c
ON c.subscription_id = us.id
AND (
(c.status = 'cancelled' AND c.due < $1) OR
(c.status = 'failed' AND c.last_attempt < $1 - INTERVAL '2 days')
)
",
now
)
.fetch_all(exec)
.await?;
Ok(results
.into_iter()
.map(|r| r.try_into())
.collect::<Result<Vec<_>, serde_json::Error>>()?)
}
pub async fn upsert(
&self,
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,

View File

@@ -21,7 +21,12 @@ pub struct Product {
#[serde(tag = "type", rename_all = "kebab-case")]
pub enum ProductMetadata {
Midas,
Pyro { ram: u32 },
Pyro {
cpu: u32,
ram: u32,
swap: u32,
storage: u32,
},
}
#[derive(Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Debug)]
@@ -70,13 +75,16 @@ impl PriceDuration {
_ => PriceDuration::Monthly,
}
}
pub fn as_str(&self) -> &'static str {
match self {
PriceDuration::Monthly => "monthly",
PriceDuration::Yearly => "yearly",
}
}
pub fn iterator() -> impl Iterator<Item = PriceDuration> {
vec![PriceDuration::Monthly, PriceDuration::Yearly].into_iter()
}
}
#[derive(Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Debug)]

View File

@@ -1,4 +1,5 @@
use crate::auth::{get_user_from_headers, send_email};
use crate::database::models::charge_item::ChargeItem;
use crate::database::models::{
generate_charge_id, generate_user_subscription_id, product_item, user_subscription_item,
};
@@ -15,6 +16,8 @@ use crate::routes::ApiError;
use actix_web::{delete, get, patch, post, web, HttpRequest, HttpResponse};
use chrono::Utc;
use log::{info, warn};
use rust_decimal::prelude::ToPrimitive;
use rust_decimal::Decimal;
use serde::Serialize;
use serde_with::serde_derive::Deserialize;
use sqlx::{PgPool, Postgres, Transaction};
@@ -105,6 +108,7 @@ pub async fn subscriptions(
#[derive(Deserialize)]
pub struct SubscriptionEdit {
pub interval: Option<PriceDuration>,
pub payment_method: Option<String>,
pub cancelled: Option<bool>,
pub product: Option<crate::models::ids::ProductId>,
}
@@ -117,6 +121,7 @@ pub async fn edit_subscription(
redis: web::Data<RedisPool>,
session_queue: web::Data<AuthQueue>,
edit_subscription: web::Json<SubscriptionEdit>,
stripe_client: web::Data<stripe::Client>,
) -> Result<HttpResponse, ApiError> {
let user = get_user_from_headers(
&req,
@@ -187,11 +192,139 @@ pub async fn edit_subscription(
}
}
let intent = if let Some(product_id) = &edit_subscription.product {
let product_price = product_item::ProductPriceItem::get_all_product_prices(
(*product_id).into(),
&mut *transaction,
)
.await?
.into_iter()
.find(|x| x.currency_code == current_price.currency_code)
.ok_or_else(|| {
ApiError::InvalidInput(
"Could not find a valid price for your currency code!".to_string(),
)
})?;
if product_price.id == current_price.id {
return Err(ApiError::InvalidInput(
"You may not change the price of this subscription!".to_string(),
));
}
let interval = open_charge.due - Utc::now();
let duration = PriceDuration::iterator()
.min_by_key(|x| (x.duration().num_seconds() - interval.num_seconds()).abs())
.unwrap_or(PriceDuration::Monthly);
let current_amount = match &current_price.prices {
Price::OneTime { price } => *price,
Price::Recurring { intervals } => *intervals.get(&duration).ok_or_else(|| {
ApiError::InvalidInput(
"Could not find a valid price for the user's duration".to_string(),
)
})?,
};
let amount = match &product_price.prices {
Price::OneTime { price } => *price,
Price::Recurring { intervals } => *intervals.get(&duration).ok_or_else(|| {
ApiError::InvalidInput(
"Could not find a valid price for the user's duration".to_string(),
)
})?,
};
let complete = Decimal::from(interval.num_seconds())
/ Decimal::from(duration.duration().num_seconds());
let proration = (Decimal::from(amount - current_amount) * complete)
.floor()
.to_i32()
.ok_or_else(|| {
ApiError::InvalidInput("Could not convert proration to i32".to_string())
})?;
// TODO: Add downgrading plans
if proration <= 0 {
return Err(ApiError::InvalidInput(
"You may not downgrade plans!".to_string(),
));
}
let charge_id = generate_charge_id(&mut transaction).await?;
let charge = ChargeItem {
id: charge_id,
user_id: user.id.into(),
price_id: product_price.id,
amount: proration as i64,
currency_code: current_price.currency_code.clone(),
status: ChargeStatus::Processing,
due: Utc::now(),
last_attempt: None,
type_: ChargeType::Proration,
subscription_id: Some(subscription.id),
subscription_interval: Some(duration),
};
let customer_id = get_or_create_customer(
user.id,
user.stripe_customer_id.as_deref(),
user.email.as_deref(),
&stripe_client,
&pool,
&redis,
)
.await?;
let currency = Currency::from_str(&current_price.currency_code.to_lowercase())
.map_err(|_| ApiError::InvalidInput("Invalid currency code".to_string()))?;
let mut intent = CreatePaymentIntent::new(proration as i64, currency);
let mut metadata = HashMap::new();
metadata.insert("modrinth_user_id".to_string(), to_base62(user.id.0));
intent.customer = Some(customer_id);
intent.metadata = Some(metadata);
intent.receipt_email = user.email.as_deref();
intent.setup_future_usage = Some(PaymentIntentSetupFutureUsage::OffSession);
if let Some(payment_method) = &edit_subscription.payment_method {
let payment_method_id = if let Ok(id) = PaymentMethodId::from_str(&payment_method) {
id
} else {
return Err(ApiError::InvalidInput(
"Invalid payment method id".to_string(),
));
};
intent.payment_method = Some(payment_method_id);
}
charge.upsert(&mut transaction).await?;
Some((
proration,
0,
stripe::PaymentIntent::create(&stripe_client, intent).await?,
))
} else {
None
};
open_charge.upsert(&mut transaction).await?;
transaction.commit().await?;
Ok(HttpResponse::NoContent().body(""))
if let Some((amount, tax, payment_intent)) = intent {
Ok(HttpResponse::Ok().json(serde_json::json!({
"payment_intent_id": payment_intent.id,
"client_secret": payment_intent.client_secret,
"tax": tax,
"total": amount
})))
} else {
Ok(HttpResponse::NoContent().body(""))
}
} else {
Err(ApiError::NotFound)
}
@@ -971,9 +1104,17 @@ pub async fn stripe_webhook(
break 'metadata;
};
if let Some(interval) = charge.subscription_interval {
subscription.interval = interval;
match charge.type_ {
ChargeType::OneTime | ChargeType::Subscription => {
if let Some(interval) = charge.subscription_interval {
subscription.interval = interval;
}
}
ChargeType::Proration => {
subscription.price_id = charge.price_id;
}
}
subscription.upsert(transaction).await?;
(charge, price, product, Some(subscription))
@@ -1037,11 +1178,7 @@ pub async fn stripe_webhook(
price_id,
interval,
created: Utc::now(),
status: if charge_status == ChargeStatus::Succeeded {
SubscriptionStatus::Provisioned
} else {
SubscriptionStatus::Unprovisioned
},
status: SubscriptionStatus::Unprovisioned,
metadata: None,
};
@@ -1056,7 +1193,7 @@ pub async fn stripe_webhook(
}
};
let charge = crate::database::models::charge_item::ChargeItem {
let charge = ChargeItem {
id: charge_id,
user_id,
price_id,
@@ -1101,7 +1238,7 @@ pub async fn stripe_webhook(
if let EventObject::PaymentIntent(payment_intent) = event.data.object {
let mut transaction = pool.begin().await?;
let metadata = get_payment_intent_metadata(
let mut metadata = get_payment_intent_metadata(
payment_intent.metadata,
&pool,
&redis,
@@ -1127,78 +1264,144 @@ pub async fn stripe_webhook(
.execute(&mut *transaction)
.await?;
}
ProductMetadata::Pyro { ram } => {
ProductMetadata::Pyro {
ram,
cpu,
swap,
storage,
} => {
if let Some(ref subscription) = metadata.user_subscription_item {
let client = reqwest::Client::new();
if let Some(SubscriptionMetadata::Pyro { id }) =
&subscription.metadata
{
let res = client
client
.post(format!(
"https://archon.pyro.host/v0/servers/{}/unsuspend",
"https://archon.pyro.host/modrinth/v0/servers/{}/unsuspend",
id
))
.header("X-Master-Key", dotenvy::var("PYRO_API_KEY")?)
.send()
.await;
.await?
.error_for_status()?;
if let Err(e) = res {
warn!("Error unsuspending pyro server: {:?}", e);
}
} else if let Some(PaymentRequestMetadata::Pyro {
server_name,
source,
}) = &metadata.payment_metadata
{
let server_name = server_name.clone().unwrap_or_else(|| {
// TODO: Send plan upgrade request for proration
} else {
let (server_name, source) = if let Some(
PaymentRequestMetadata::Pyro {
ref server_name,
ref source,
},
) = metadata.payment_metadata
{
(server_name.clone(), source.clone())
} else {
// Create a server with the latest version of Minecraft
let minecraft_versions = crate::database::models::legacy_loader_fields::MinecraftGameVersion::list(
Some("release"),
None,
&**pool,
&redis,
).await?;
(
None,
serde_json::json!({
"loader": "Vanilla",
"game_version": minecraft_versions.first().map(|x| x.version.clone()),
"loader_version": ""
}),
)
};
let server_name = server_name.unwrap_or_else(|| {
format!("{}'s server", metadata.user_item.username)
});
#[derive(Deserialize)]
struct PyroServerResponse {
uuid: String,
}
let res = client
.post("https://archon.pyro.host/v0/servers/create")
.post("https://archon.pyro.host/modrinth/v0/servers/create")
.header("X-Master-Key", dotenvy::var("PYRO_API_KEY")?)
.json(&serde_json::json!({
"user_id": to_base62(metadata.user_item.id.0 as u64),
"name": server_name,
"specs": {
"ram": ram,
"cpu": std::cmp::max(2, (ram / 1024) / 2),
"swap": ram / 4,
"memory_mb": ram,
"cpu": cpu,
"swap_mb": swap,
"storage_mb": storage,
},
"source": source,
}))
.send()
.await;
.await?
.error_for_status()?
.json::<PyroServerResponse>()
.await?;
if let Err(e) = res {
warn!("Error creating pyro server: {:?}", e);
if let Some(ref mut subscription) =
metadata.user_subscription_item
{
subscription.metadata =
Some(SubscriptionMetadata::Pyro { id: res.uuid });
}
}
}
}
}
if let Some(subscription) = metadata.user_subscription_item {
if metadata.charge_item.status != ChargeStatus::Cancelled {
if let Some(mut subscription) = metadata.user_subscription_item {
let open_charge =
ChargeItem::get_open_subscription(subscription.id, &mut *transaction)
.await?;
let new_price = match metadata.product_price_item.prices {
Price::OneTime { price } => price,
Price::Recurring { intervals } => {
*intervals.get(&subscription.interval).ok_or_else(|| {
ApiError::InvalidInput(
"Could not find a valid price for the user's country"
.to_string(),
)
})?
}
};
if let Some(mut charge) = open_charge {
charge.price_id = metadata.product_price_item.id;
charge.amount = new_price as i64;
charge.upsert(&mut transaction).await?;
} else if metadata.charge_item.status != ChargeStatus::Cancelled {
let charge_id = generate_charge_id(&mut transaction).await?;
let charge = crate::database::models::charge_item::ChargeItem {
ChargeItem {
id: charge_id,
user_id: metadata.user_item.id,
price_id: metadata.product_price_item.id,
amount: metadata.charge_item.amount,
amount: new_price as i64,
currency_code: metadata.product_price_item.currency_code,
status: ChargeStatus::Open,
due: Utc::now() + subscription.interval.duration(),
due: if subscription.status == SubscriptionStatus::Unprovisioned {
Utc::now() + subscription.interval.duration()
} else {
metadata.charge_item.due + subscription.interval.duration()
},
last_attempt: None,
type_: ChargeType::Subscription,
subscription_id: Some(subscription.id),
subscription_interval: Some(subscription.interval),
};
let err = charge.upsert(&mut transaction).await;
}
.upsert(&mut transaction)
.await?;
};
err?;
}
subscription.status = SubscriptionStatus::Provisioned;
subscription.upsert(&mut transaction).await?;
}
transaction.commit().await?;
@@ -1346,8 +1549,18 @@ pub async fn subscription_task(pool: PgPool, redis: RedisPool) {
let mut clear_cache_users = Vec::new();
// If an active subscription has a canceled charge OR a failed charge more than two days ago, it should be cancelled
let all_subscriptions =
user_subscription_item::UserSubscriptionItem::get_all_unprovision(&pool).await?;
let all_charges = ChargeItem::get_unprovision(&pool).await?;
let mut all_subscriptions = user_subscription_item::UserSubscriptionItem::get_many(
&all_charges
.iter()
.filter_map(|x| x.subscription_id)
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>(),
&pool,
)
.await?;
let subscription_prices = product_item::ProductPriceItem::get_many(
&all_subscriptions
.iter()
@@ -1380,7 +1593,20 @@ pub async fn subscription_task(pool: PgPool, redis: RedisPool) {
)
.await?;
for mut subscription in all_subscriptions {
for charge in all_charges {
let subscription = if let Some(subscription) = all_subscriptions
.iter_mut()
.find(|x| Some(x.id) == charge.subscription_id)
{
subscription
} else {
continue;
};
if subscription.status == SubscriptionStatus::Unprovisioned {
continue;
}
let product_price = if let Some(product_price) = subscription_prices
.iter()
.find(|x| x.id == subscription.price_id)
@@ -1427,12 +1653,16 @@ pub async fn subscription_task(pool: PgPool, redis: RedisPool) {
if let Some(SubscriptionMetadata::Pyro { id }) = &subscription.metadata {
let res = reqwest::Client::new()
.post(format!(
"https://archon.pyro.host/v0/servers/{}/suspend",
"https://archon.pyro.host/modrinth/v0/servers/{}/suspend",
id
))
.header("X-Master-Key", dotenvy::var("PYRO_API_KEY")?)
.json(&serde_json::json!({
"reason": "cancelled"
"reason": if charge.status == ChargeStatus::Cancelled {
"cancelled"
} else {
"paymentfailed"
}
}))
.send()
.await;