You've already forked AstralRinth
Add revenue split to affiliate codes v2 (#4672)
* wip: affiliate payouts again * Implement affiliate payout queue * Deactivate subscription affiliations on cancellation * Remove a test that never compiled in the first place * Update sqlx cache * address some PR comments * more comments * wip: handle refund charges * cargo sqlx prepare * Address PR comments * cargo sqlx prepare
This commit is contained in:
@@ -152,3 +152,5 @@ MURALPAY_API_URL=https://api.muralpay.com
|
||||
MURALPAY_API_KEY=none
|
||||
MURALPAY_TRANSFER_API_KEY=none
|
||||
MURALPAY_SOURCE_ACCOUNT_ID=none
|
||||
|
||||
DEFAULT_AFFILIATE_REVENUE_SPLIT=0.1
|
||||
|
||||
@@ -156,3 +156,5 @@ MURALPAY_API_URL=https://api-staging.muralpay.com
|
||||
MURALPAY_API_KEY=none
|
||||
MURALPAY_TRANSFER_API_KEY=none
|
||||
MURALPAY_SOURCE_ACCOUNT_ID=none
|
||||
|
||||
DEFAULT_AFFILIATE_REVENUE_SPLIT=0.1
|
||||
|
||||
26
apps/labrinth/.sqlx/query-08310363d63462bf1d07f950f09b8e3b466c7d6fd7a6efd3984a3cbc87f996bc.json
generated
Normal file
26
apps/labrinth/.sqlx/query-08310363d63462bf1d07f950f09b8e3b466c7d6fd7a6efd3984a3cbc87f996bc.json
generated
Normal file
@@ -0,0 +1,26 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n INSERT INTO payouts_values\n (user_id, amount, created,\n date_available, affiliate_code_source)\n VALUES ($1, $2, $3, $4, $5)\n RETURNING id\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "id",
|
||||
"type_info": "Int8"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int8",
|
||||
"Numeric",
|
||||
"Timestamptz",
|
||||
"Timestamptz",
|
||||
"Int8"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "08310363d63462bf1d07f950f09b8e3b466c7d6fd7a6efd3984a3cbc87f996bc"
|
||||
}
|
||||
26
apps/labrinth/.sqlx/query-0d9ae03c785ef21ecd9dc17d7c025431bf14c6e168a8350f1fe7602be4934ece.json
generated
Normal file
26
apps/labrinth/.sqlx/query-0d9ae03c785ef21ecd9dc17d7c025431bf14c6e168a8350f1fe7602be4934ece.json
generated
Normal file
@@ -0,0 +1,26 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT\n usap.id as usap_id,\n usap.payout_value_id\n FROM charges refund_charges\n -- find original charges that have been refunded\n INNER JOIN charges original_charges\n ON original_charges.id = refund_charges.parent_charge_id\n -- find affiliate payouts for those original charges\n INNER JOIN users_subscriptions_affiliations_payouts usap\n ON usap.charge_id = original_charges.id\n -- only include payouts that haven't been issued yet (not available as of now)\n INNER JOIN payouts_values pv\n ON pv.id = usap.payout_value_id\n AND pv.date_available > NOW()\n WHERE\n refund_charges.status = 'succeeded'\n -- make sure it's actually a refund charge\n AND refund_charges.charge_type = 'refund'\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "usap_id",
|
||||
"type_info": "Int8"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "payout_value_id",
|
||||
"type_info": "Int8"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": []
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "0d9ae03c785ef21ecd9dc17d7c025431bf14c6e168a8350f1fe7602be4934ece"
|
||||
}
|
||||
14
apps/labrinth/.sqlx/query-270cf4cb3a56ee14f3671d8d3ad17ed8fbdc0ca791370ac52ff480c0e2d2cbf6.json
generated
Normal file
14
apps/labrinth/.sqlx/query-270cf4cb3a56ee14f3671d8d3ad17ed8fbdc0ca791370ac52ff480c0e2d2cbf6.json
generated
Normal file
@@ -0,0 +1,14 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n DELETE FROM payouts_values\n WHERE id = ANY($1::bigint[])\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int8Array"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "270cf4cb3a56ee14f3671d8d3ad17ed8fbdc0ca791370ac52ff480c0e2d2cbf6"
|
||||
}
|
||||
17
apps/labrinth/.sqlx/query-57fd45648479590a01e8da51b2ecd59cbe76a1ae8acb4c2d7c96ba397739873a.json
generated
Normal file
17
apps/labrinth/.sqlx/query-57fd45648479590a01e8da51b2ecd59cbe76a1ae8acb4c2d7c96ba397739873a.json
generated
Normal file
@@ -0,0 +1,17 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n INSERT INTO users_subscriptions_affiliations_payouts\n (charge_id, subscription_id,\n affiliate_code, payout_value_id)\n SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::bigint[], $4::bigint[])\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int8Array",
|
||||
"Int8Array",
|
||||
"Int8Array",
|
||||
"Int8Array"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "57fd45648479590a01e8da51b2ecd59cbe76a1ae8acb4c2d7c96ba397739873a"
|
||||
}
|
||||
16
apps/labrinth/.sqlx/query-64844433bb6c7e5a48890ec42786f56a5e220c21aa9d0fc0c03f57bee864fb63.json
generated
Normal file
16
apps/labrinth/.sqlx/query-64844433bb6c7e5a48890ec42786f56a5e220c21aa9d0fc0c03f57bee864fb63.json
generated
Normal file
@@ -0,0 +1,16 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n INSERT INTO users_subscriptions_affiliations\n (subscription_id, affiliate_code, deactivated_at)\n VALUES ($1, $2, $3)\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int8",
|
||||
"Int8",
|
||||
"Timestamptz"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "64844433bb6c7e5a48890ec42786f56a5e220c21aa9d0fc0c03f57bee864fb63"
|
||||
}
|
||||
25
apps/labrinth/.sqlx/query-82a8120805e27f9134ccaa02ea25e7ddaf51f952783f68eda706505a825f25a5.json
generated
Normal file
25
apps/labrinth/.sqlx/query-82a8120805e27f9134ccaa02ea25e7ddaf51f952783f68eda706505a825f25a5.json
generated
Normal file
@@ -0,0 +1,25 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n INSERT INTO users_subscriptions_affiliations_payouts\n (charge_id, subscription_id, affiliate_code, payout_value_id)\n VALUES ($1, $2, $3, $4)\n RETURNING id\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "id",
|
||||
"type_info": "Int8"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int8",
|
||||
"Int8",
|
||||
"Int8",
|
||||
"Int8"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "82a8120805e27f9134ccaa02ea25e7ddaf51f952783f68eda706505a825f25a5"
|
||||
}
|
||||
14
apps/labrinth/.sqlx/query-8b3990f01c62c20fc2ff66566282db593fd6e8990a7e32305c2daa4aac3f6d74.json
generated
Normal file
14
apps/labrinth/.sqlx/query-8b3990f01c62c20fc2ff66566282db593fd6e8990a7e32305c2daa4aac3f6d74.json
generated
Normal file
@@ -0,0 +1,14 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n DELETE FROM users_subscriptions_affiliations_payouts\n WHERE id = ANY($1::bigint[])\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int8Array"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "8b3990f01c62c20fc2ff66566282db593fd6e8990a7e32305c2daa4aac3f6d74"
|
||||
}
|
||||
14
apps/labrinth/.sqlx/query-abdda73294ec06970af162132e49c3b8116b282e0edb2d3c71b8a98e6353ce82.json
generated
Normal file
14
apps/labrinth/.sqlx/query-abdda73294ec06970af162132e49c3b8116b282e0edb2d3c71b8a98e6353ce82.json
generated
Normal file
@@ -0,0 +1,14 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "UPDATE users_subscriptions_affiliations\n SET deactivated_at = NOW()\n WHERE subscription_id = $1",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int8"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "abdda73294ec06970af162132e49c3b8116b282e0edb2d3c71b8a98e6353ce82"
|
||||
}
|
||||
68
apps/labrinth/.sqlx/query-e8225fa8dae7e1ca57bc8f259cda40c3eb8b12943db05884386a456c5eb91117.json
generated
Normal file
68
apps/labrinth/.sqlx/query-e8225fa8dae7e1ca57bc8f259cda40c3eb8b12943db05884386a456c5eb91117.json
generated
Normal file
@@ -0,0 +1,68 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT\n c.id as charge_id,\n c.subscription_id as \"subscription_id!\",\n c.net as charge_net,\n c.tax_amount as charge_tax_amount,\n c.last_attempt as charge_last_attempt,\n c.currency_code,\n usa.affiliate_code,\n ac.affiliate as affiliate_user_id,\n ac.revenue_split\n -- get any charges...\n FROM charges c\n -- ...which have a subscription...\n INNER JOIN users_subscriptions_affiliations usa\n ON c.subscription_id = usa.subscription_id\n AND c.subscription_id IS NOT NULL\n AND (usa.deactivated_at IS NULL OR c.last_attempt < usa.deactivated_at)\n -- ...which have an affiliate code...\n INNER JOIN affiliate_codes ac\n ON usa.affiliate_code = ac.id\n -- ...and where no payout to an affiliate has been made for this charge yet\n LEFT JOIN users_subscriptions_affiliations_payouts usap\n ON c.id = usap.charge_id\n WHERE\n c.status = 'succeeded'\n AND c.net > 0\n AND usap.id IS NULL\n -- exclude charges that have refund charges\n AND NOT EXISTS (\n SELECT 1\n FROM charges refund_charges\n WHERE refund_charges.parent_charge_id = c.id\n )\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "charge_id",
|
||||
"type_info": "Int8"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "subscription_id!",
|
||||
"type_info": "Int8"
|
||||
},
|
||||
{
|
||||
"ordinal": 2,
|
||||
"name": "charge_net",
|
||||
"type_info": "Int8"
|
||||
},
|
||||
{
|
||||
"ordinal": 3,
|
||||
"name": "charge_tax_amount",
|
||||
"type_info": "Int8"
|
||||
},
|
||||
{
|
||||
"ordinal": 4,
|
||||
"name": "charge_last_attempt",
|
||||
"type_info": "Timestamptz"
|
||||
},
|
||||
{
|
||||
"ordinal": 5,
|
||||
"name": "currency_code",
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"ordinal": 6,
|
||||
"name": "affiliate_code",
|
||||
"type_info": "Int8"
|
||||
},
|
||||
{
|
||||
"ordinal": 7,
|
||||
"name": "affiliate_user_id",
|
||||
"type_info": "Int8"
|
||||
},
|
||||
{
|
||||
"ordinal": 8,
|
||||
"name": "revenue_split",
|
||||
"type_info": "Float8"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": []
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
true,
|
||||
true,
|
||||
false,
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
true
|
||||
]
|
||||
},
|
||||
"hash": "e8225fa8dae7e1ca57bc8f259cda40c3eb8b12943db05884386a456c5eb91117"
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
CREATE TABLE users_subscriptions_affiliations (
|
||||
subscription_id BIGINT NOT NULL REFERENCES users_subscriptions(id),
|
||||
affiliate_code BIGINT NOT NULL REFERENCES affiliate_codes(id),
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
deactivated_at TIMESTAMPTZ,
|
||||
UNIQUE (subscription_id)
|
||||
);
|
||||
|
||||
CREATE TABLE users_subscriptions_affiliations_payouts(
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
charge_id BIGINT NOT NULL REFERENCES charges(id),
|
||||
subscription_id BIGINT NOT NULL REFERENCES users_subscriptions(id),
|
||||
affiliate_code BIGINT NOT NULL REFERENCES affiliate_codes(id),
|
||||
payout_value_id BIGSERIAL NOT NULL REFERENCES payouts_values(id),
|
||||
UNIQUE (charge_id),
|
||||
UNIQUE (payout_value_id)
|
||||
);
|
||||
|
||||
ALTER TABLE payouts_values
|
||||
ADD COLUMN affiliate_code_source BIGINT;
|
||||
@@ -3,7 +3,8 @@ use crate::queue::billing::{index_billing, index_subscriptions};
|
||||
use crate::queue::email::EmailQueue;
|
||||
use crate::queue::payouts::{
|
||||
PayoutsQueue, index_payouts_notifications,
|
||||
insert_bank_balances_and_webhook, process_payout,
|
||||
insert_bank_balances_and_webhook, process_affiliate_payouts,
|
||||
process_payout, remove_payouts_for_refunded_charges,
|
||||
};
|
||||
use crate::search::indexing::index_projects;
|
||||
use crate::util::anrok;
|
||||
@@ -185,12 +186,22 @@ pub async fn payouts(
|
||||
info!("Started running payouts");
|
||||
let result = process_payout(&pool, &clickhouse).await;
|
||||
if let Err(e) = result {
|
||||
warn!("Payouts run failed: {:?}", e);
|
||||
warn!("Payouts run failed: {e:#?}");
|
||||
}
|
||||
|
||||
let result = index_payouts_notifications(&pool, &redis_pool).await;
|
||||
if let Err(e) = result {
|
||||
warn!("Payouts notifications indexing failed: {:?}", e);
|
||||
warn!("Payouts notifications indexing failed: {e:#?}");
|
||||
}
|
||||
|
||||
let result = process_affiliate_payouts(&pool).await;
|
||||
if let Err(e) = result {
|
||||
warn!("Affiliate payouts run failed: {e:#?}");
|
||||
}
|
||||
|
||||
let result = remove_payouts_for_refunded_charges(&pool).await;
|
||||
if let Err(e) = result {
|
||||
warn!("Removing affiliate payouts for refunded charges failed: {e:#?}");
|
||||
}
|
||||
|
||||
info!("Done running payouts");
|
||||
|
||||
@@ -35,6 +35,7 @@ pub mod user_subscription_item;
|
||||
pub mod users_compliance;
|
||||
pub mod users_notifications_preferences_item;
|
||||
pub mod users_redeemals;
|
||||
pub mod users_subscriptions_affiliations;
|
||||
pub mod users_subscriptions_credits;
|
||||
pub mod version_item;
|
||||
|
||||
|
||||
@@ -0,0 +1,86 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::database::models::{
|
||||
DBAffiliateCodeId, DBChargeId, DBUserSubscriptionId,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct DBUsersSubscriptionsAffiliations {
|
||||
pub subscription_id: DBUserSubscriptionId,
|
||||
pub affiliate_code: DBAffiliateCodeId,
|
||||
pub deactivated_at: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
impl DBUsersSubscriptionsAffiliations {
|
||||
pub async fn insert<'a, E>(&self, exec: E) -> sqlx::Result<()>
|
||||
where
|
||||
E: sqlx::PgExecutor<'a>,
|
||||
{
|
||||
sqlx::query_scalar!(
|
||||
"
|
||||
INSERT INTO users_subscriptions_affiliations
|
||||
(subscription_id, affiliate_code, deactivated_at)
|
||||
VALUES ($1, $2, $3)
|
||||
",
|
||||
self.subscription_id.0,
|
||||
self.affiliate_code.0,
|
||||
self.deactivated_at,
|
||||
)
|
||||
.fetch_one(exec)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn deactivate<'a, E>(
|
||||
subscription_id: DBUserSubscriptionId,
|
||||
exec: E,
|
||||
) -> sqlx::Result<()>
|
||||
where
|
||||
E: sqlx::PgExecutor<'a>,
|
||||
{
|
||||
sqlx::query!(
|
||||
"UPDATE users_subscriptions_affiliations
|
||||
SET deactivated_at = NOW()
|
||||
WHERE subscription_id = $1",
|
||||
subscription_id.0,
|
||||
)
|
||||
.execute(exec)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct DBUsersSubscriptionsAffiliationsPayouts {
|
||||
pub id: i64,
|
||||
pub charge_id: DBChargeId,
|
||||
pub subscription_id: DBUserSubscriptionId,
|
||||
pub affiliate_code: DBAffiliateCodeId,
|
||||
pub payout_value_id: i64,
|
||||
}
|
||||
|
||||
impl DBUsersSubscriptionsAffiliationsPayouts {
|
||||
pub async fn insert<'a, E>(&mut self, exec: E) -> sqlx::Result<()>
|
||||
where
|
||||
E: sqlx::PgExecutor<'a>,
|
||||
{
|
||||
let id = sqlx::query_scalar!(
|
||||
"
|
||||
INSERT INTO users_subscriptions_affiliations_payouts
|
||||
(charge_id, subscription_id, affiliate_code, payout_value_id)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
RETURNING id
|
||||
",
|
||||
self.charge_id.0,
|
||||
self.subscription_id.0,
|
||||
self.affiliate_code.0,
|
||||
self.payout_value_id,
|
||||
)
|
||||
.fetch_one(exec)
|
||||
.await?;
|
||||
|
||||
self.id = id;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -536,5 +536,7 @@ pub fn check_env_vars() -> bool {
|
||||
failed |= check_var::<String>("MURALPAY_TRANSFER_API_KEY");
|
||||
failed |= check_var::<String>("MURALPAY_SOURCE_ACCOUNT_ID");
|
||||
|
||||
failed |= check_var::<String>("DEFAULT_AFFILIATE_REVENUE_SPLIT");
|
||||
|
||||
failed
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::database::models::products_tax_identifier_item::DBProductsTaxIdentifi
|
||||
use crate::database::models::user_item::DBUser;
|
||||
use crate::database::models::user_subscription_item::DBUserSubscription;
|
||||
use crate::database::models::users_redeemals::UserRedeemal;
|
||||
use crate::database::models::users_subscriptions_affiliations::DBUsersSubscriptionsAffiliations;
|
||||
use crate::database::models::{DatabaseError, ids::*};
|
||||
use crate::database::models::{
|
||||
product_item, user_subscription_item, users_redeemals,
|
||||
@@ -22,6 +23,7 @@ use crate::routes::internal::billing::payments::*;
|
||||
use crate::util::anrok;
|
||||
use crate::util::archon::ArchonClient;
|
||||
use crate::util::archon::{CreateServerRequest, Specs};
|
||||
use crate::util::error::Context;
|
||||
use ariadne::ids::base62_impl::to_base62;
|
||||
use chrono::Utc;
|
||||
use futures::FutureExt;
|
||||
@@ -937,6 +939,15 @@ async fn unprovision_subscriptions(
|
||||
if unprovisioned {
|
||||
subscription.status = SubscriptionStatus::Unprovisioned;
|
||||
subscription.upsert(&mut transaction).await?;
|
||||
|
||||
DBUsersSubscriptionsAffiliations::deactivate(
|
||||
subscription.id,
|
||||
&mut *transaction,
|
||||
)
|
||||
.await
|
||||
.wrap_internal_err(
|
||||
"failed to deactivate subscription affiliation",
|
||||
)?;
|
||||
}
|
||||
|
||||
clear_cache_users.push(user.id);
|
||||
|
||||
272
apps/labrinth/src/queue/payouts/affiliate.rs
Normal file
272
apps/labrinth/src/queue/payouts/affiliate.rs
Normal file
@@ -0,0 +1,272 @@
|
||||
use chrono::{Datelike, Duration, TimeZone, Utc};
|
||||
use eyre::{Context, Result, eyre};
|
||||
use rust_decimal::{Decimal, dec};
|
||||
use sqlx::PgPool;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::database::models::{DBAffiliateCodeId, DBUserId};
|
||||
|
||||
pub async fn process_affiliate_payouts(postgres: &PgPool) -> Result<()> {
|
||||
// process:
|
||||
// - get any subscriptions which are in `users_subscriptions_affiliations`
|
||||
// - for those subscriptions, get any charges which are not in `users_subscriptions_affiliations_payouts`
|
||||
// - for each of those charges,
|
||||
// - get the subscription's `affiliate_code`
|
||||
// - get the affiliate user of that code
|
||||
// - add a payout for that affiliate user, proportional to the `net - tax` of the charge
|
||||
// - add a record of this into `users_subscriptions_affiliations_payouts`
|
||||
|
||||
let mut txn = postgres
|
||||
.begin()
|
||||
.await
|
||||
.wrap_err("failed to begin transaction")?;
|
||||
|
||||
let charges = sqlx::query!(
|
||||
r#"
|
||||
SELECT
|
||||
c.id as charge_id,
|
||||
c.subscription_id as "subscription_id!",
|
||||
c.net as charge_net,
|
||||
c.tax_amount as charge_tax_amount,
|
||||
c.last_attempt as charge_last_attempt,
|
||||
c.currency_code,
|
||||
usa.affiliate_code,
|
||||
ac.affiliate as affiliate_user_id,
|
||||
ac.revenue_split
|
||||
-- get any charges...
|
||||
FROM charges c
|
||||
-- ...which have a subscription...
|
||||
INNER JOIN users_subscriptions_affiliations usa
|
||||
ON c.subscription_id = usa.subscription_id
|
||||
AND c.subscription_id IS NOT NULL
|
||||
AND (usa.deactivated_at IS NULL OR c.last_attempt < usa.deactivated_at)
|
||||
-- ...which have an affiliate code...
|
||||
INNER JOIN affiliate_codes ac
|
||||
ON usa.affiliate_code = ac.id
|
||||
-- ...and where no payout to an affiliate has been made for this charge yet
|
||||
LEFT JOIN users_subscriptions_affiliations_payouts usap
|
||||
ON c.id = usap.charge_id
|
||||
WHERE
|
||||
c.status = 'succeeded'
|
||||
AND c.net > 0
|
||||
AND usap.id IS NULL
|
||||
-- exclude charges that have refund charges
|
||||
AND NOT EXISTS (
|
||||
SELECT 1
|
||||
FROM charges refund_charges
|
||||
WHERE refund_charges.parent_charge_id = c.id
|
||||
)
|
||||
"#
|
||||
)
|
||||
.fetch_all(&mut *txn)
|
||||
.await
|
||||
.wrap_err("failed to fetch charges awaiting affiliate payout")?;
|
||||
|
||||
let default_affiliate_revenue_split =
|
||||
dotenvy::var("DEFAULT_AFFILIATE_REVENUE_SPLIT")
|
||||
.wrap_err("no env var `DEFAULT_AFFILIATE_REVENUE_SPLIT`")?
|
||||
.parse::<Decimal>()
|
||||
.wrap_err("`DEFAULT_AFFILIATE_REVENUE_SPLIT` is not a decimal")?;
|
||||
|
||||
let (
|
||||
mut insert_usap_charges,
|
||||
mut insert_usap_subscriptions,
|
||||
mut insert_usap_affiliate_codes,
|
||||
mut insert_usap_payout_values,
|
||||
) = (Vec::new(), Vec::new(), Vec::new(), Vec::new());
|
||||
|
||||
for row in charges {
|
||||
let Some(net) = row.charge_net else {
|
||||
warn!(
|
||||
"Charge {} has no net amount; cannot calculate affiliate payout",
|
||||
row.charge_id
|
||||
);
|
||||
continue;
|
||||
};
|
||||
let net = Decimal::new(net, 2);
|
||||
let tax_amount = Decimal::new(row.charge_tax_amount, 2);
|
||||
|
||||
let Some(last_attempt) = row.charge_last_attempt else {
|
||||
warn!(
|
||||
"Charge {} has no last attempt; cannot calculate affiliate payout",
|
||||
row.charge_id
|
||||
);
|
||||
continue;
|
||||
};
|
||||
|
||||
// affiliate payouts are Net 60 from the end of the month
|
||||
// this is net 60 relative to the time of the charge's last attempt, not from now
|
||||
let available = {
|
||||
let year = last_attempt.year();
|
||||
let month = last_attempt.month();
|
||||
|
||||
// get the first day of the next month
|
||||
let last_day_of_month = if month == 12 {
|
||||
Utc.with_ymd_and_hms(year + 1, 1, 1, 0, 0, 0).unwrap()
|
||||
} else {
|
||||
Utc.with_ymd_and_hms(year, month + 1, 1, 0, 0, 0).unwrap()
|
||||
};
|
||||
|
||||
last_day_of_month + Duration::days(59)
|
||||
};
|
||||
|
||||
let revenue_split = row
|
||||
.revenue_split
|
||||
.and_then(Decimal::from_f64_retain)
|
||||
.unwrap_or(default_affiliate_revenue_split);
|
||||
if !(Decimal::from(0)..=Decimal::from(1)).contains(&revenue_split) {
|
||||
warn!(
|
||||
"Charge {} has revenue split {revenue_split} which is out of range",
|
||||
row.charge_id,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
let affiliate_cut = (net - tax_amount) * revenue_split;
|
||||
if affiliate_cut < dec!(0.01) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let affiliate_user_id = DBUserId(row.affiliate_user_id);
|
||||
let affiliate_code_id = DBAffiliateCodeId(row.affiliate_code);
|
||||
|
||||
// don't batch up affiliate code payouts into one big payout per (user, affiliate code)
|
||||
// because we want to associate 1 payout to an affiliate, with 1 concrete charge
|
||||
|
||||
let payout_value_id = sqlx::query!(
|
||||
"
|
||||
INSERT INTO payouts_values
|
||||
(user_id, amount, created,
|
||||
date_available, affiliate_code_source)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
RETURNING id
|
||||
",
|
||||
affiliate_user_id as _,
|
||||
affiliate_cut,
|
||||
last_attempt,
|
||||
available,
|
||||
affiliate_code_id as _,
|
||||
)
|
||||
.fetch_one(&mut *txn)
|
||||
.await
|
||||
.wrap_err_with(|| eyre!("failed to insert payout value for ({affiliate_user_id:?}, {affiliate_code_id:?})"))?
|
||||
.id;
|
||||
|
||||
insert_usap_charges.push(row.charge_id);
|
||||
insert_usap_subscriptions.push(row.subscription_id);
|
||||
insert_usap_affiliate_codes.push(affiliate_code_id.0);
|
||||
insert_usap_payout_values.push(payout_value_id);
|
||||
}
|
||||
|
||||
sqlx::query!(
|
||||
"
|
||||
INSERT INTO users_subscriptions_affiliations_payouts
|
||||
(charge_id, subscription_id,
|
||||
affiliate_code, payout_value_id)
|
||||
SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::bigint[], $4::bigint[])
|
||||
",
|
||||
&insert_usap_charges[..],
|
||||
&insert_usap_subscriptions[..],
|
||||
&insert_usap_affiliate_codes[..],
|
||||
&insert_usap_payout_values[..],
|
||||
)
|
||||
.execute(&mut *txn)
|
||||
.await
|
||||
.wrap_err("failed to associate charges with affiliate payouts")?;
|
||||
|
||||
txn.commit()
|
||||
.await
|
||||
.wrap_err("failed to commit transaction")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn remove_payouts_for_refunded_charges(
|
||||
postgres: &PgPool,
|
||||
) -> Result<()> {
|
||||
// process:
|
||||
// - find refund charges
|
||||
// - which have a parent charge
|
||||
// - where that parent charge has a `usap` row
|
||||
// - where the `usap.payout_value_id` is not available as of `now()`
|
||||
// - (don't revoke payout values which have already been issued to the affiliate)
|
||||
// - delete the `usap` and `usap.payout_value_id` row
|
||||
|
||||
let mut txn = postgres
|
||||
.begin()
|
||||
.await
|
||||
.wrap_err("failed to begin transaction")?;
|
||||
|
||||
// note: this may return duplicate `usap_id` rows
|
||||
// it's fine here, since we delete them all anyway
|
||||
let refundable_payouts = sqlx::query!(
|
||||
r#"
|
||||
SELECT
|
||||
usap.id as usap_id,
|
||||
usap.payout_value_id
|
||||
FROM charges refund_charges
|
||||
-- find original charges that have been refunded
|
||||
INNER JOIN charges original_charges
|
||||
ON original_charges.id = refund_charges.parent_charge_id
|
||||
-- find affiliate payouts for those original charges
|
||||
INNER JOIN users_subscriptions_affiliations_payouts usap
|
||||
ON usap.charge_id = original_charges.id
|
||||
-- only include payouts that haven't been issued yet (not available as of now)
|
||||
INNER JOIN payouts_values pv
|
||||
ON pv.id = usap.payout_value_id
|
||||
AND pv.date_available > NOW()
|
||||
WHERE
|
||||
refund_charges.status = 'succeeded'
|
||||
-- make sure it's actually a refund charge
|
||||
AND refund_charges.charge_type = 'refund'
|
||||
"#
|
||||
)
|
||||
.fetch_all(&mut *txn)
|
||||
.await
|
||||
.wrap_err("failed to fetch refundable affiliate payouts")?;
|
||||
|
||||
if refundable_payouts.is_empty() {
|
||||
txn.commit()
|
||||
.await
|
||||
.wrap_err("failed to commit transaction")?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut usap_ids = Vec::new();
|
||||
let mut payout_value_ids = Vec::new();
|
||||
|
||||
for payout in refundable_payouts {
|
||||
usap_ids.push(payout.usap_id);
|
||||
payout_value_ids.push(payout.payout_value_id);
|
||||
}
|
||||
|
||||
// Delete the affiliate payout associations
|
||||
sqlx::query!(
|
||||
"
|
||||
DELETE FROM users_subscriptions_affiliations_payouts
|
||||
WHERE id = ANY($1::bigint[])
|
||||
",
|
||||
&usap_ids[..]
|
||||
)
|
||||
.execute(&mut *txn)
|
||||
.await
|
||||
.wrap_err("failed to delete affiliate payout associations")?;
|
||||
|
||||
// Delete the payout values
|
||||
sqlx::query!(
|
||||
"
|
||||
DELETE FROM payouts_values
|
||||
WHERE id = ANY($1::bigint[])
|
||||
",
|
||||
&payout_value_ids[..]
|
||||
)
|
||||
.execute(&mut *txn)
|
||||
.await
|
||||
.wrap_err("failed to delete payout values")?;
|
||||
|
||||
txn.commit()
|
||||
.await
|
||||
.wrap_err("failed to commit transaction")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -36,6 +36,11 @@ use tracing::{error, info, warn};
|
||||
|
||||
pub mod mural;
|
||||
|
||||
mod affiliate;
|
||||
pub use affiliate::{
|
||||
process_affiliate_payouts, remove_payouts_for_refunded_charges,
|
||||
};
|
||||
|
||||
pub struct PayoutsQueue {
|
||||
credential: RwLock<Option<PayPalCredentials>>,
|
||||
payout_options: RwLock<Option<PayoutMethods>>,
|
||||
|
||||
@@ -4,9 +4,11 @@ use crate::database::models::charge_item::DBCharge;
|
||||
use crate::database::models::ids::DBUserSubscriptionId;
|
||||
use crate::database::models::notification_item::NotificationBuilder;
|
||||
use crate::database::models::products_tax_identifier_item::product_info_by_product_price_id;
|
||||
use crate::database::models::users_subscriptions_affiliations::DBUsersSubscriptionsAffiliations;
|
||||
use crate::database::models::users_subscriptions_credits::DBUserSubscriptionCredit;
|
||||
use crate::database::models::{
|
||||
charge_item, generate_charge_id, product_item, user_subscription_item,
|
||||
DBAffiliateCodeId, charge_item, generate_charge_id, product_item,
|
||||
user_subscription_item,
|
||||
};
|
||||
use crate::database::redis::RedisPool;
|
||||
use crate::models::billing::{
|
||||
@@ -14,6 +16,7 @@ use crate::models::billing::{
|
||||
Product, ProductMetadata, ProductPrice, SubscriptionMetadata,
|
||||
SubscriptionStatus, UserSubscription,
|
||||
};
|
||||
use crate::models::ids::AffiliateCodeId;
|
||||
use crate::models::notifications::NotificationBody;
|
||||
use crate::models::pats::Scopes;
|
||||
use crate::models::users::Badges;
|
||||
@@ -793,6 +796,11 @@ pub async fn edit_subscription(
|
||||
..
|
||||
} if open_charge.status == ChargeStatus::Failed => {
|
||||
if cancelled {
|
||||
DBUsersSubscriptionsAffiliations::deactivate(
|
||||
subscription.id,
|
||||
&mut *transaction,
|
||||
)
|
||||
.await?;
|
||||
open_charge.status = ChargeStatus::Cancelled;
|
||||
} else {
|
||||
// Forces another resubscription attempt
|
||||
@@ -812,6 +820,11 @@ pub async fn edit_subscription(
|
||||
) =>
|
||||
{
|
||||
open_charge.status = if cancelled {
|
||||
DBUsersSubscriptionsAffiliations::deactivate(
|
||||
subscription.id,
|
||||
&mut *transaction,
|
||||
)
|
||||
.await?;
|
||||
ChargeStatus::Cancelled
|
||||
} else {
|
||||
ChargeStatus::Open
|
||||
@@ -1508,7 +1521,15 @@ pub enum ChargeRequestType {
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
pub enum PaymentRequestMetadata {
|
||||
pub struct PaymentRequestMetadata {
|
||||
#[serde(flatten)]
|
||||
pub kind: PaymentRequestMetadataKind,
|
||||
pub affiliate_code: Option<AffiliateCodeId>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
pub enum PaymentRequestMetadataKind {
|
||||
Pyro {
|
||||
server_name: Option<String>,
|
||||
server_region: Option<String>,
|
||||
@@ -2046,12 +2067,12 @@ pub async fn stripe_webhook(
|
||||
} else {
|
||||
let (server_name, server_region, source) =
|
||||
if let Some(
|
||||
PaymentRequestMetadata::Pyro {
|
||||
ref server_name,
|
||||
ref server_region,
|
||||
ref source,
|
||||
PaymentRequestMetadataKind::Pyro {
|
||||
server_name,
|
||||
server_region,
|
||||
source,
|
||||
},
|
||||
) = metadata.payment_metadata
|
||||
) = metadata.payment_metadata.as_ref().map(|m| &m.kind)
|
||||
{
|
||||
(
|
||||
server_name.clone(),
|
||||
@@ -2235,6 +2256,22 @@ pub async fn stripe_webhook(
|
||||
}
|
||||
.upsert(&mut transaction)
|
||||
.await?;
|
||||
|
||||
if let Some(affiliate_code) = metadata
|
||||
.payment_metadata
|
||||
.as_ref()
|
||||
.and_then(|m| m.affiliate_code)
|
||||
{
|
||||
DBUsersSubscriptionsAffiliations {
|
||||
subscription_id: subscription.id,
|
||||
affiliate_code: DBAffiliateCodeId::from(
|
||||
affiliate_code,
|
||||
),
|
||||
deactivated_at: None,
|
||||
}
|
||||
.insert(&mut *transaction)
|
||||
.await?;
|
||||
}
|
||||
};
|
||||
|
||||
subscription.status = SubscriptionStatus::Provisioned;
|
||||
|
||||
Reference in New Issue
Block a user