diff --git a/apps/labrinth/.env.docker-compose b/apps/labrinth/.env.docker-compose index 6271bd3cb..6102fc99a 100644 --- a/apps/labrinth/.env.docker-compose +++ b/apps/labrinth/.env.docker-compose @@ -152,3 +152,5 @@ MURALPAY_API_URL=https://api.muralpay.com MURALPAY_API_KEY=none MURALPAY_TRANSFER_API_KEY=none MURALPAY_SOURCE_ACCOUNT_ID=none + +DEFAULT_AFFILIATE_REVENUE_SPLIT=0.1 diff --git a/apps/labrinth/.env.local b/apps/labrinth/.env.local index f4115b4ce..261513ed9 100644 --- a/apps/labrinth/.env.local +++ b/apps/labrinth/.env.local @@ -156,3 +156,5 @@ MURALPAY_API_URL=https://api-staging.muralpay.com MURALPAY_API_KEY=none MURALPAY_TRANSFER_API_KEY=none MURALPAY_SOURCE_ACCOUNT_ID=none + +DEFAULT_AFFILIATE_REVENUE_SPLIT=0.1 diff --git a/apps/labrinth/.sqlx/query-08310363d63462bf1d07f950f09b8e3b466c7d6fd7a6efd3984a3cbc87f996bc.json b/apps/labrinth/.sqlx/query-08310363d63462bf1d07f950f09b8e3b466c7d6fd7a6efd3984a3cbc87f996bc.json new file mode 100644 index 000000000..4fa58d023 --- /dev/null +++ b/apps/labrinth/.sqlx/query-08310363d63462bf1d07f950f09b8e3b466c7d6fd7a6efd3984a3cbc87f996bc.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO payouts_values\n (user_id, amount, created,\n date_available, affiliate_code_source)\n VALUES ($1, $2, $3, $4, $5)\n RETURNING id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8", + "Numeric", + "Timestamptz", + "Timestamptz", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "08310363d63462bf1d07f950f09b8e3b466c7d6fd7a6efd3984a3cbc87f996bc" +} diff --git a/apps/labrinth/.sqlx/query-0d9ae03c785ef21ecd9dc17d7c025431bf14c6e168a8350f1fe7602be4934ece.json b/apps/labrinth/.sqlx/query-0d9ae03c785ef21ecd9dc17d7c025431bf14c6e168a8350f1fe7602be4934ece.json new file mode 100644 index 000000000..82413ac58 --- /dev/null +++ b/apps/labrinth/.sqlx/query-0d9ae03c785ef21ecd9dc17d7c025431bf14c6e168a8350f1fe7602be4934ece.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n usap.id as usap_id,\n usap.payout_value_id\n FROM charges refund_charges\n -- find original charges that have been refunded\n INNER JOIN charges original_charges\n ON original_charges.id = refund_charges.parent_charge_id\n -- find affiliate payouts for those original charges\n INNER JOIN users_subscriptions_affiliations_payouts usap\n ON usap.charge_id = original_charges.id\n -- only include payouts that haven't been issued yet (not available as of now)\n INNER JOIN payouts_values pv\n ON pv.id = usap.payout_value_id\n AND pv.date_available > NOW()\n WHERE\n refund_charges.status = 'succeeded'\n -- make sure it's actually a refund charge\n AND refund_charges.charge_type = 'refund'\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "usap_id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "payout_value_id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false + ] + }, + "hash": "0d9ae03c785ef21ecd9dc17d7c025431bf14c6e168a8350f1fe7602be4934ece" +} diff --git a/apps/labrinth/.sqlx/query-270cf4cb3a56ee14f3671d8d3ad17ed8fbdc0ca791370ac52ff480c0e2d2cbf6.json b/apps/labrinth/.sqlx/query-270cf4cb3a56ee14f3671d8d3ad17ed8fbdc0ca791370ac52ff480c0e2d2cbf6.json new file mode 100644 index 000000000..1ab4452ce --- /dev/null +++ b/apps/labrinth/.sqlx/query-270cf4cb3a56ee14f3671d8d3ad17ed8fbdc0ca791370ac52ff480c0e2d2cbf6.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM payouts_values\n WHERE id = ANY($1::bigint[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8Array" + ] + }, + "nullable": [] + }, + "hash": "270cf4cb3a56ee14f3671d8d3ad17ed8fbdc0ca791370ac52ff480c0e2d2cbf6" +} diff --git a/apps/labrinth/.sqlx/query-57fd45648479590a01e8da51b2ecd59cbe76a1ae8acb4c2d7c96ba397739873a.json b/apps/labrinth/.sqlx/query-57fd45648479590a01e8da51b2ecd59cbe76a1ae8acb4c2d7c96ba397739873a.json new file mode 100644 index 000000000..0007929c2 --- /dev/null +++ b/apps/labrinth/.sqlx/query-57fd45648479590a01e8da51b2ecd59cbe76a1ae8acb4c2d7c96ba397739873a.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO users_subscriptions_affiliations_payouts\n (charge_id, subscription_id,\n affiliate_code, payout_value_id)\n SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::bigint[], $4::bigint[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8Array", + "Int8Array", + "Int8Array", + "Int8Array" + ] + }, + "nullable": [] + }, + "hash": "57fd45648479590a01e8da51b2ecd59cbe76a1ae8acb4c2d7c96ba397739873a" +} diff --git a/apps/labrinth/.sqlx/query-64844433bb6c7e5a48890ec42786f56a5e220c21aa9d0fc0c03f57bee864fb63.json b/apps/labrinth/.sqlx/query-64844433bb6c7e5a48890ec42786f56a5e220c21aa9d0fc0c03f57bee864fb63.json new file mode 100644 index 000000000..172159dc9 --- /dev/null +++ b/apps/labrinth/.sqlx/query-64844433bb6c7e5a48890ec42786f56a5e220c21aa9d0fc0c03f57bee864fb63.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO users_subscriptions_affiliations\n (subscription_id, affiliate_code, deactivated_at)\n VALUES ($1, $2, $3)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Int8", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "64844433bb6c7e5a48890ec42786f56a5e220c21aa9d0fc0c03f57bee864fb63" +} diff --git a/apps/labrinth/.sqlx/query-82a8120805e27f9134ccaa02ea25e7ddaf51f952783f68eda706505a825f25a5.json b/apps/labrinth/.sqlx/query-82a8120805e27f9134ccaa02ea25e7ddaf51f952783f68eda706505a825f25a5.json new file mode 100644 index 000000000..6ba94641a --- /dev/null +++ b/apps/labrinth/.sqlx/query-82a8120805e27f9134ccaa02ea25e7ddaf51f952783f68eda706505a825f25a5.json @@ -0,0 +1,25 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO users_subscriptions_affiliations_payouts\n (charge_id, subscription_id, affiliate_code, payout_value_id)\n VALUES ($1, $2, $3, $4)\n RETURNING id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8", + "Int8", + "Int8", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "82a8120805e27f9134ccaa02ea25e7ddaf51f952783f68eda706505a825f25a5" +} diff --git a/apps/labrinth/.sqlx/query-8b3990f01c62c20fc2ff66566282db593fd6e8990a7e32305c2daa4aac3f6d74.json b/apps/labrinth/.sqlx/query-8b3990f01c62c20fc2ff66566282db593fd6e8990a7e32305c2daa4aac3f6d74.json new file mode 100644 index 000000000..38452ab02 --- /dev/null +++ b/apps/labrinth/.sqlx/query-8b3990f01c62c20fc2ff66566282db593fd6e8990a7e32305c2daa4aac3f6d74.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM users_subscriptions_affiliations_payouts\n WHERE id = ANY($1::bigint[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8Array" + ] + }, + "nullable": [] + }, + "hash": "8b3990f01c62c20fc2ff66566282db593fd6e8990a7e32305c2daa4aac3f6d74" +} diff --git a/apps/labrinth/.sqlx/query-abdda73294ec06970af162132e49c3b8116b282e0edb2d3c71b8a98e6353ce82.json b/apps/labrinth/.sqlx/query-abdda73294ec06970af162132e49c3b8116b282e0edb2d3c71b8a98e6353ce82.json new file mode 100644 index 000000000..c0de4c972 --- /dev/null +++ b/apps/labrinth/.sqlx/query-abdda73294ec06970af162132e49c3b8116b282e0edb2d3c71b8a98e6353ce82.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE users_subscriptions_affiliations\n SET deactivated_at = NOW()\n WHERE subscription_id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "abdda73294ec06970af162132e49c3b8116b282e0edb2d3c71b8a98e6353ce82" +} diff --git a/apps/labrinth/.sqlx/query-e8225fa8dae7e1ca57bc8f259cda40c3eb8b12943db05884386a456c5eb91117.json b/apps/labrinth/.sqlx/query-e8225fa8dae7e1ca57bc8f259cda40c3eb8b12943db05884386a456c5eb91117.json new file mode 100644 index 000000000..928a31574 --- /dev/null +++ b/apps/labrinth/.sqlx/query-e8225fa8dae7e1ca57bc8f259cda40c3eb8b12943db05884386a456c5eb91117.json @@ -0,0 +1,68 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n c.id as charge_id,\n c.subscription_id as \"subscription_id!\",\n c.net as charge_net,\n c.tax_amount as charge_tax_amount,\n c.last_attempt as charge_last_attempt,\n c.currency_code,\n usa.affiliate_code,\n ac.affiliate as affiliate_user_id,\n ac.revenue_split\n -- get any charges...\n FROM charges c\n -- ...which have a subscription...\n INNER JOIN users_subscriptions_affiliations usa\n ON c.subscription_id = usa.subscription_id\n AND c.subscription_id IS NOT NULL\n AND (usa.deactivated_at IS NULL OR c.last_attempt < usa.deactivated_at)\n -- ...which have an affiliate code...\n INNER JOIN affiliate_codes ac\n ON usa.affiliate_code = ac.id\n -- ...and where no payout to an affiliate has been made for this charge yet\n LEFT JOIN users_subscriptions_affiliations_payouts usap\n ON c.id = usap.charge_id\n WHERE\n c.status = 'succeeded'\n AND c.net > 0\n AND usap.id IS NULL\n -- exclude charges that have refund charges\n AND NOT EXISTS (\n SELECT 1\n FROM charges refund_charges\n WHERE refund_charges.parent_charge_id = c.id\n )\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "charge_id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "subscription_id!", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "charge_net", + "type_info": "Int8" + }, + { + "ordinal": 3, + "name": "charge_tax_amount", + "type_info": "Int8" + }, + { + "ordinal": 4, + "name": "charge_last_attempt", + "type_info": "Timestamptz" + }, + { + "ordinal": 5, + "name": "currency_code", + "type_info": "Text" + }, + { + "ordinal": 6, + "name": "affiliate_code", + "type_info": "Int8" + }, + { + "ordinal": 7, + "name": "affiliate_user_id", + "type_info": "Int8" + }, + { + "ordinal": 8, + "name": "revenue_split", + "type_info": "Float8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + true, + true, + false, + true, + false, + false, + false, + true + ] + }, + "hash": "e8225fa8dae7e1ca57bc8f259cda40c3eb8b12943db05884386a456c5eb91117" +} diff --git a/apps/labrinth/migrations/20251024182919_subscription_affiliations.sql b/apps/labrinth/migrations/20251024182919_subscription_affiliations.sql new file mode 100644 index 000000000..1369fb7a7 --- /dev/null +++ b/apps/labrinth/migrations/20251024182919_subscription_affiliations.sql @@ -0,0 +1,20 @@ +CREATE TABLE users_subscriptions_affiliations ( + subscription_id BIGINT NOT NULL REFERENCES users_subscriptions(id), + affiliate_code BIGINT NOT NULL REFERENCES affiliate_codes(id), + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + deactivated_at TIMESTAMPTZ, + UNIQUE (subscription_id) +); + +CREATE TABLE users_subscriptions_affiliations_payouts( + id BIGSERIAL PRIMARY KEY, + charge_id BIGINT NOT NULL REFERENCES charges(id), + subscription_id BIGINT NOT NULL REFERENCES users_subscriptions(id), + affiliate_code BIGINT NOT NULL REFERENCES affiliate_codes(id), + payout_value_id BIGSERIAL NOT NULL REFERENCES payouts_values(id), + UNIQUE (charge_id), + UNIQUE (payout_value_id) +); + +ALTER TABLE payouts_values +ADD COLUMN affiliate_code_source BIGINT; diff --git a/apps/labrinth/src/background_task.rs b/apps/labrinth/src/background_task.rs index ee60a9876..2b6398d51 100644 --- a/apps/labrinth/src/background_task.rs +++ b/apps/labrinth/src/background_task.rs @@ -3,7 +3,8 @@ use crate::queue::billing::{index_billing, index_subscriptions}; use crate::queue::email::EmailQueue; use crate::queue::payouts::{ PayoutsQueue, index_payouts_notifications, - insert_bank_balances_and_webhook, process_payout, + insert_bank_balances_and_webhook, process_affiliate_payouts, + process_payout, remove_payouts_for_refunded_charges, }; use crate::search::indexing::index_projects; use crate::util::anrok; @@ -185,12 +186,22 @@ pub async fn payouts( info!("Started running payouts"); let result = process_payout(&pool, &clickhouse).await; if let Err(e) = result { - warn!("Payouts run failed: {:?}", e); + warn!("Payouts run failed: {e:#?}"); } let result = index_payouts_notifications(&pool, &redis_pool).await; if let Err(e) = result { - warn!("Payouts notifications indexing failed: {:?}", e); + warn!("Payouts notifications indexing failed: {e:#?}"); + } + + let result = process_affiliate_payouts(&pool).await; + if let Err(e) = result { + warn!("Affiliate payouts run failed: {e:#?}"); + } + + let result = remove_payouts_for_refunded_charges(&pool).await; + if let Err(e) = result { + warn!("Removing affiliate payouts for refunded charges failed: {e:#?}"); } info!("Done running payouts"); diff --git a/apps/labrinth/src/database/models/mod.rs b/apps/labrinth/src/database/models/mod.rs index 0e5f31cdf..e3f42b518 100644 --- a/apps/labrinth/src/database/models/mod.rs +++ b/apps/labrinth/src/database/models/mod.rs @@ -35,6 +35,7 @@ pub mod user_subscription_item; pub mod users_compliance; pub mod users_notifications_preferences_item; pub mod users_redeemals; +pub mod users_subscriptions_affiliations; pub mod users_subscriptions_credits; pub mod version_item; diff --git a/apps/labrinth/src/database/models/users_subscriptions_affiliations.rs b/apps/labrinth/src/database/models/users_subscriptions_affiliations.rs new file mode 100644 index 000000000..dccd79302 --- /dev/null +++ b/apps/labrinth/src/database/models/users_subscriptions_affiliations.rs @@ -0,0 +1,86 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +use crate::database::models::{ + DBAffiliateCodeId, DBChargeId, DBUserSubscriptionId, +}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DBUsersSubscriptionsAffiliations { + pub subscription_id: DBUserSubscriptionId, + pub affiliate_code: DBAffiliateCodeId, + pub deactivated_at: Option>, +} + +impl DBUsersSubscriptionsAffiliations { + pub async fn insert<'a, E>(&self, exec: E) -> sqlx::Result<()> + where + E: sqlx::PgExecutor<'a>, + { + sqlx::query_scalar!( + " + INSERT INTO users_subscriptions_affiliations + (subscription_id, affiliate_code, deactivated_at) + VALUES ($1, $2, $3) + ", + self.subscription_id.0, + self.affiliate_code.0, + self.deactivated_at, + ) + .fetch_one(exec) + .await?; + Ok(()) + } + + pub async fn deactivate<'a, E>( + subscription_id: DBUserSubscriptionId, + exec: E, + ) -> sqlx::Result<()> + where + E: sqlx::PgExecutor<'a>, + { + sqlx::query!( + "UPDATE users_subscriptions_affiliations + SET deactivated_at = NOW() + WHERE subscription_id = $1", + subscription_id.0, + ) + .execute(exec) + .await?; + Ok(()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DBUsersSubscriptionsAffiliationsPayouts { + pub id: i64, + pub charge_id: DBChargeId, + pub subscription_id: DBUserSubscriptionId, + pub affiliate_code: DBAffiliateCodeId, + pub payout_value_id: i64, +} + +impl DBUsersSubscriptionsAffiliationsPayouts { + pub async fn insert<'a, E>(&mut self, exec: E) -> sqlx::Result<()> + where + E: sqlx::PgExecutor<'a>, + { + let id = sqlx::query_scalar!( + " + INSERT INTO users_subscriptions_affiliations_payouts + (charge_id, subscription_id, affiliate_code, payout_value_id) + VALUES ($1, $2, $3, $4) + RETURNING id + ", + self.charge_id.0, + self.subscription_id.0, + self.affiliate_code.0, + self.payout_value_id, + ) + .fetch_one(exec) + .await?; + + self.id = id; + Ok(()) + } +} diff --git a/apps/labrinth/src/lib.rs b/apps/labrinth/src/lib.rs index 9f7c0107c..27995d321 100644 --- a/apps/labrinth/src/lib.rs +++ b/apps/labrinth/src/lib.rs @@ -536,5 +536,7 @@ pub fn check_env_vars() -> bool { failed |= check_var::("MURALPAY_TRANSFER_API_KEY"); failed |= check_var::("MURALPAY_SOURCE_ACCOUNT_ID"); + failed |= check_var::("DEFAULT_AFFILIATE_REVENUE_SPLIT"); + failed } diff --git a/apps/labrinth/src/queue/billing.rs b/apps/labrinth/src/queue/billing.rs index bfb3d7490..dfa7b02dd 100644 --- a/apps/labrinth/src/queue/billing.rs +++ b/apps/labrinth/src/queue/billing.rs @@ -5,6 +5,7 @@ use crate::database::models::products_tax_identifier_item::DBProductsTaxIdentifi use crate::database::models::user_item::DBUser; use crate::database::models::user_subscription_item::DBUserSubscription; use crate::database::models::users_redeemals::UserRedeemal; +use crate::database::models::users_subscriptions_affiliations::DBUsersSubscriptionsAffiliations; use crate::database::models::{DatabaseError, ids::*}; use crate::database::models::{ product_item, user_subscription_item, users_redeemals, @@ -22,6 +23,7 @@ use crate::routes::internal::billing::payments::*; use crate::util::anrok; use crate::util::archon::ArchonClient; use crate::util::archon::{CreateServerRequest, Specs}; +use crate::util::error::Context; use ariadne::ids::base62_impl::to_base62; use chrono::Utc; use futures::FutureExt; @@ -937,6 +939,15 @@ async fn unprovision_subscriptions( if unprovisioned { subscription.status = SubscriptionStatus::Unprovisioned; subscription.upsert(&mut transaction).await?; + + DBUsersSubscriptionsAffiliations::deactivate( + subscription.id, + &mut *transaction, + ) + .await + .wrap_internal_err( + "failed to deactivate subscription affiliation", + )?; } clear_cache_users.push(user.id); diff --git a/apps/labrinth/src/queue/payouts/affiliate.rs b/apps/labrinth/src/queue/payouts/affiliate.rs new file mode 100644 index 000000000..61adf1ab4 --- /dev/null +++ b/apps/labrinth/src/queue/payouts/affiliate.rs @@ -0,0 +1,272 @@ +use chrono::{Datelike, Duration, TimeZone, Utc}; +use eyre::{Context, Result, eyre}; +use rust_decimal::{Decimal, dec}; +use sqlx::PgPool; +use tracing::warn; + +use crate::database::models::{DBAffiliateCodeId, DBUserId}; + +pub async fn process_affiliate_payouts(postgres: &PgPool) -> Result<()> { + // process: + // - get any subscriptions which are in `users_subscriptions_affiliations` + // - for those subscriptions, get any charges which are not in `users_subscriptions_affiliations_payouts` + // - for each of those charges, + // - get the subscription's `affiliate_code` + // - get the affiliate user of that code + // - add a payout for that affiliate user, proportional to the `net - tax` of the charge + // - add a record of this into `users_subscriptions_affiliations_payouts` + + let mut txn = postgres + .begin() + .await + .wrap_err("failed to begin transaction")?; + + let charges = sqlx::query!( + r#" + SELECT + c.id as charge_id, + c.subscription_id as "subscription_id!", + c.net as charge_net, + c.tax_amount as charge_tax_amount, + c.last_attempt as charge_last_attempt, + c.currency_code, + usa.affiliate_code, + ac.affiliate as affiliate_user_id, + ac.revenue_split + -- get any charges... + FROM charges c + -- ...which have a subscription... + INNER JOIN users_subscriptions_affiliations usa + ON c.subscription_id = usa.subscription_id + AND c.subscription_id IS NOT NULL + AND (usa.deactivated_at IS NULL OR c.last_attempt < usa.deactivated_at) + -- ...which have an affiliate code... + INNER JOIN affiliate_codes ac + ON usa.affiliate_code = ac.id + -- ...and where no payout to an affiliate has been made for this charge yet + LEFT JOIN users_subscriptions_affiliations_payouts usap + ON c.id = usap.charge_id + WHERE + c.status = 'succeeded' + AND c.net > 0 + AND usap.id IS NULL + -- exclude charges that have refund charges + AND NOT EXISTS ( + SELECT 1 + FROM charges refund_charges + WHERE refund_charges.parent_charge_id = c.id + ) + "# + ) + .fetch_all(&mut *txn) + .await + .wrap_err("failed to fetch charges awaiting affiliate payout")?; + + let default_affiliate_revenue_split = + dotenvy::var("DEFAULT_AFFILIATE_REVENUE_SPLIT") + .wrap_err("no env var `DEFAULT_AFFILIATE_REVENUE_SPLIT`")? + .parse::() + .wrap_err("`DEFAULT_AFFILIATE_REVENUE_SPLIT` is not a decimal")?; + + let ( + mut insert_usap_charges, + mut insert_usap_subscriptions, + mut insert_usap_affiliate_codes, + mut insert_usap_payout_values, + ) = (Vec::new(), Vec::new(), Vec::new(), Vec::new()); + + for row in charges { + let Some(net) = row.charge_net else { + warn!( + "Charge {} has no net amount; cannot calculate affiliate payout", + row.charge_id + ); + continue; + }; + let net = Decimal::new(net, 2); + let tax_amount = Decimal::new(row.charge_tax_amount, 2); + + let Some(last_attempt) = row.charge_last_attempt else { + warn!( + "Charge {} has no last attempt; cannot calculate affiliate payout", + row.charge_id + ); + continue; + }; + + // affiliate payouts are Net 60 from the end of the month + // this is net 60 relative to the time of the charge's last attempt, not from now + let available = { + let year = last_attempt.year(); + let month = last_attempt.month(); + + // get the first day of the next month + let last_day_of_month = if month == 12 { + Utc.with_ymd_and_hms(year + 1, 1, 1, 0, 0, 0).unwrap() + } else { + Utc.with_ymd_and_hms(year, month + 1, 1, 0, 0, 0).unwrap() + }; + + last_day_of_month + Duration::days(59) + }; + + let revenue_split = row + .revenue_split + .and_then(Decimal::from_f64_retain) + .unwrap_or(default_affiliate_revenue_split); + if !(Decimal::from(0)..=Decimal::from(1)).contains(&revenue_split) { + warn!( + "Charge {} has revenue split {revenue_split} which is out of range", + row.charge_id, + ); + continue; + } + + let affiliate_cut = (net - tax_amount) * revenue_split; + if affiliate_cut < dec!(0.01) { + continue; + } + + let affiliate_user_id = DBUserId(row.affiliate_user_id); + let affiliate_code_id = DBAffiliateCodeId(row.affiliate_code); + + // don't batch up affiliate code payouts into one big payout per (user, affiliate code) + // because we want to associate 1 payout to an affiliate, with 1 concrete charge + + let payout_value_id = sqlx::query!( + " + INSERT INTO payouts_values + (user_id, amount, created, + date_available, affiliate_code_source) + VALUES ($1, $2, $3, $4, $5) + RETURNING id + ", + affiliate_user_id as _, + affiliate_cut, + last_attempt, + available, + affiliate_code_id as _, + ) + .fetch_one(&mut *txn) + .await + .wrap_err_with(|| eyre!("failed to insert payout value for ({affiliate_user_id:?}, {affiliate_code_id:?})"))? + .id; + + insert_usap_charges.push(row.charge_id); + insert_usap_subscriptions.push(row.subscription_id); + insert_usap_affiliate_codes.push(affiliate_code_id.0); + insert_usap_payout_values.push(payout_value_id); + } + + sqlx::query!( + " + INSERT INTO users_subscriptions_affiliations_payouts + (charge_id, subscription_id, + affiliate_code, payout_value_id) + SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::bigint[], $4::bigint[]) + ", + &insert_usap_charges[..], + &insert_usap_subscriptions[..], + &insert_usap_affiliate_codes[..], + &insert_usap_payout_values[..], + ) + .execute(&mut *txn) + .await + .wrap_err("failed to associate charges with affiliate payouts")?; + + txn.commit() + .await + .wrap_err("failed to commit transaction")?; + + Ok(()) +} + +pub async fn remove_payouts_for_refunded_charges( + postgres: &PgPool, +) -> Result<()> { + // process: + // - find refund charges + // - which have a parent charge + // - where that parent charge has a `usap` row + // - where the `usap.payout_value_id` is not available as of `now()` + // - (don't revoke payout values which have already been issued to the affiliate) + // - delete the `usap` and `usap.payout_value_id` row + + let mut txn = postgres + .begin() + .await + .wrap_err("failed to begin transaction")?; + + // note: this may return duplicate `usap_id` rows + // it's fine here, since we delete them all anyway + let refundable_payouts = sqlx::query!( + r#" + SELECT + usap.id as usap_id, + usap.payout_value_id + FROM charges refund_charges + -- find original charges that have been refunded + INNER JOIN charges original_charges + ON original_charges.id = refund_charges.parent_charge_id + -- find affiliate payouts for those original charges + INNER JOIN users_subscriptions_affiliations_payouts usap + ON usap.charge_id = original_charges.id + -- only include payouts that haven't been issued yet (not available as of now) + INNER JOIN payouts_values pv + ON pv.id = usap.payout_value_id + AND pv.date_available > NOW() + WHERE + refund_charges.status = 'succeeded' + -- make sure it's actually a refund charge + AND refund_charges.charge_type = 'refund' + "# + ) + .fetch_all(&mut *txn) + .await + .wrap_err("failed to fetch refundable affiliate payouts")?; + + if refundable_payouts.is_empty() { + txn.commit() + .await + .wrap_err("failed to commit transaction")?; + return Ok(()); + } + + let mut usap_ids = Vec::new(); + let mut payout_value_ids = Vec::new(); + + for payout in refundable_payouts { + usap_ids.push(payout.usap_id); + payout_value_ids.push(payout.payout_value_id); + } + + // Delete the affiliate payout associations + sqlx::query!( + " + DELETE FROM users_subscriptions_affiliations_payouts + WHERE id = ANY($1::bigint[]) + ", + &usap_ids[..] + ) + .execute(&mut *txn) + .await + .wrap_err("failed to delete affiliate payout associations")?; + + // Delete the payout values + sqlx::query!( + " + DELETE FROM payouts_values + WHERE id = ANY($1::bigint[]) + ", + &payout_value_ids[..] + ) + .execute(&mut *txn) + .await + .wrap_err("failed to delete payout values")?; + + txn.commit() + .await + .wrap_err("failed to commit transaction")?; + + Ok(()) +} diff --git a/apps/labrinth/src/queue/payouts/mod.rs b/apps/labrinth/src/queue/payouts/mod.rs index 6beba7566..5fca223ed 100644 --- a/apps/labrinth/src/queue/payouts/mod.rs +++ b/apps/labrinth/src/queue/payouts/mod.rs @@ -36,6 +36,11 @@ use tracing::{error, info, warn}; pub mod mural; +mod affiliate; +pub use affiliate::{ + process_affiliate_payouts, remove_payouts_for_refunded_charges, +}; + pub struct PayoutsQueue { credential: RwLock>, payout_options: RwLock>, diff --git a/apps/labrinth/src/routes/internal/billing.rs b/apps/labrinth/src/routes/internal/billing.rs index 6c8276e3c..9a33e65ce 100644 --- a/apps/labrinth/src/routes/internal/billing.rs +++ b/apps/labrinth/src/routes/internal/billing.rs @@ -4,9 +4,11 @@ use crate::database::models::charge_item::DBCharge; use crate::database::models::ids::DBUserSubscriptionId; use crate::database::models::notification_item::NotificationBuilder; use crate::database::models::products_tax_identifier_item::product_info_by_product_price_id; +use crate::database::models::users_subscriptions_affiliations::DBUsersSubscriptionsAffiliations; use crate::database::models::users_subscriptions_credits::DBUserSubscriptionCredit; use crate::database::models::{ - charge_item, generate_charge_id, product_item, user_subscription_item, + DBAffiliateCodeId, charge_item, generate_charge_id, product_item, + user_subscription_item, }; use crate::database::redis::RedisPool; use crate::models::billing::{ @@ -14,6 +16,7 @@ use crate::models::billing::{ Product, ProductMetadata, ProductPrice, SubscriptionMetadata, SubscriptionStatus, UserSubscription, }; +use crate::models::ids::AffiliateCodeId; use crate::models::notifications::NotificationBody; use crate::models::pats::Scopes; use crate::models::users::Badges; @@ -793,6 +796,11 @@ pub async fn edit_subscription( .. } if open_charge.status == ChargeStatus::Failed => { if cancelled { + DBUsersSubscriptionsAffiliations::deactivate( + subscription.id, + &mut *transaction, + ) + .await?; open_charge.status = ChargeStatus::Cancelled; } else { // Forces another resubscription attempt @@ -812,6 +820,11 @@ pub async fn edit_subscription( ) => { open_charge.status = if cancelled { + DBUsersSubscriptionsAffiliations::deactivate( + subscription.id, + &mut *transaction, + ) + .await?; ChargeStatus::Cancelled } else { ChargeStatus::Open @@ -1508,7 +1521,15 @@ pub enum ChargeRequestType { #[derive(Deserialize, Serialize)] #[serde(tag = "type", rename_all = "snake_case")] -pub enum PaymentRequestMetadata { +pub struct PaymentRequestMetadata { + #[serde(flatten)] + pub kind: PaymentRequestMetadataKind, + pub affiliate_code: Option, +} + +#[derive(Deserialize, Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum PaymentRequestMetadataKind { Pyro { server_name: Option, server_region: Option, @@ -2046,12 +2067,12 @@ pub async fn stripe_webhook( } else { let (server_name, server_region, source) = if let Some( - PaymentRequestMetadata::Pyro { - ref server_name, - ref server_region, - ref source, + PaymentRequestMetadataKind::Pyro { + server_name, + server_region, + source, }, - ) = metadata.payment_metadata + ) = metadata.payment_metadata.as_ref().map(|m| &m.kind) { ( server_name.clone(), @@ -2235,6 +2256,22 @@ pub async fn stripe_webhook( } .upsert(&mut transaction) .await?; + + if let Some(affiliate_code) = metadata + .payment_metadata + .as_ref() + .and_then(|m| m.affiliate_code) + { + DBUsersSubscriptionsAffiliations { + subscription_id: subscription.id, + affiliate_code: DBAffiliateCodeId::from( + affiliate_code, + ), + deactivated_at: None, + } + .insert(&mut *transaction) + .await?; + } }; subscription.status = SubscriptionStatus::Provisioned;