Fix Mural payout status syncing (#4853)

* Fix Mural payout status syncing

* Make Mural payout code more resilient

* prepare sqlx

* fix test
This commit is contained in:
aecsocket
2025-12-08 20:34:41 +00:00
committed by GitHub
parent cfd2977c21
commit 9aa06fbc26
22 changed files with 1171 additions and 1151 deletions

View File

@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE payouts\n SET status = $1\n WHERE id = $2\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Varchar",
"Int8"
]
},
"nullable": []
},
"hash": "76cf88116014e3151db2f85b0bd30dba70ae62f9f922ed711dbb85fcf4ec5f7c"
}

View File

@@ -72,7 +72,7 @@ lettre = { workspace = true }
meilisearch-sdk = { workspace = true, features = ["reqwest"] }
modrinth-maxmind = { workspace = true }
modrinth-util = { workspace = true, features = ["decimal", "utoipa"] }
muralpay = { workspace = true, features = ["mock", "utoipa"] }
muralpay = { workspace = true, features = ["client", "mock", "utoipa"] }
murmur2 = { workspace = true }
paste = { workspace = true }
path-util = { workspace = true }

View File

@@ -10,7 +10,6 @@ use crate::search::indexing::index_projects;
use crate::util::anrok;
use crate::{database, search};
use clap::ValueEnum;
use muralpay::MuralPay;
use sqlx::Postgres;
use tracing::{error, info, warn};
@@ -39,7 +38,7 @@ impl BackgroundTask {
stripe_client: stripe::Client,
anrok_client: anrok::Client,
email_queue: EmailQueue,
mural_client: MuralPay,
mural_client: muralpay::Client,
) {
use BackgroundTask::*;
match self {
@@ -207,7 +206,10 @@ pub async fn payouts(
info!("Done running payouts");
}
pub async fn sync_payout_statuses(pool: sqlx::Pool<Postgres>, mural: MuralPay) {
pub async fn sync_payout_statuses(
pool: sqlx::Pool<Postgres>,
mural: muralpay::Client,
) {
// Mural sets a max limit of 100 for search payouts endpoint
const LIMIT: u32 = 100;

View File

@@ -150,7 +150,7 @@ pub struct TremendousForexResponse {
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
pub struct MuralPayDetails {
pub payout_details: MuralPayoutRequest,
pub recipient_info: muralpay::PayoutRecipientInfo,
pub recipient_info: muralpay::CreatePayoutRecipientInfo,
}
impl PayoutMethodType {

View File

@@ -21,7 +21,6 @@ use dashmap::DashMap;
use eyre::{Result, eyre};
use futures::TryStreamExt;
use modrinth_util::decimal::Decimal2dp;
use muralpay::MuralPay;
use reqwest::Method;
use rust_decimal::prelude::ToPrimitive;
use rust_decimal::{Decimal, RoundingStrategy, dec};
@@ -48,7 +47,7 @@ pub struct PayoutsQueue {
}
pub struct MuralPayConfig {
pub client: MuralPay,
pub client: muralpay::Client,
pub source_account_id: muralpay::AccountId,
}
@@ -77,11 +76,11 @@ impl Default for PayoutsQueue {
}
}
pub fn create_muralpay_client() -> Result<MuralPay> {
pub fn create_muralpay_client() -> Result<muralpay::Client> {
let api_url = env_var("MURALPAY_API_URL")?;
let api_key = env_var("MURALPAY_API_KEY")?;
let transfer_api_key = env_var("MURALPAY_TRANSFER_API_KEY")?;
Ok(MuralPay::new(api_url, api_key, Some(transfer_api_key)))
Ok(muralpay::Client::new(api_url, api_key, transfer_api_key))
}
pub fn create_muralpay() -> Result<MuralPayConfig> {

View File

@@ -3,7 +3,7 @@ use chrono::Utc;
use eyre::{Result, eyre};
use futures::{StreamExt, TryFutureExt, stream::FuturesUnordered};
use modrinth_util::decimal::Decimal2dp;
use muralpay::{MuralError, MuralPay, TokenFeeRequest};
use muralpay::{MuralError, TokenFeeRequest};
use rust_decimal::{Decimal, prelude::ToPrimitive};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
@@ -69,7 +69,7 @@ impl PayoutsQueue {
gross_amount: Decimal2dp,
fees: PayoutFees,
payout_details: MuralPayoutRequest,
recipient_info: muralpay::PayoutRecipientInfo,
recipient_info: muralpay::CreatePayoutRecipientInfo,
gotenberg: &GotenbergClient,
) -> Result<muralpay::PayoutRequest, ApiError> {
let muralpay = self.muralpay.load();
@@ -183,36 +183,27 @@ impl PayoutsQueue {
),
})?;
// try to immediately execute the payout request...
// use a poor man's try/catch block using this `async move {}`
// to catch any errors within this block
let result = async move {
muralpay
.client
.execute_payout_request(payout_request.id)
.await
.wrap_internal_err("failed to execute payout request")?;
eyre::Ok(())
}
.await;
// and if it fails, make sure to immediately cancel it -
// we don't want floating payout requests
if let Err(err) = result {
muralpay
.client
.cancel_payout_request(payout_request.id)
.await
.wrap_internal_err_with(|| {
eyre!("failed to cancel unexecuted payout request\noriginal error: {err:#?}")
})?;
return Err(ApiError::Internal(err));
}
Ok(payout_request)
}
pub async fn cancel_muralpay_payout_request(
pub async fn execute_mural_payout_request(
&self,
id: muralpay::PayoutRequestId,
) -> Result<(), ApiError> {
let muralpay = self.muralpay.load();
let muralpay = muralpay
.as_ref()
.wrap_internal_err("Mural Pay client not available")?;
muralpay
.client
.execute_payout_request(id)
.await
.wrap_internal_err("failed to execute payout request")?;
Ok(())
}
pub async fn cancel_mural_payout_request(
&self,
id: muralpay::PayoutRequestId,
) -> Result<()> {
@@ -263,7 +254,7 @@ impl PayoutsQueue {
/// Mural state, and updates the payout status.
pub async fn sync_pending_payouts_from_mural(
db: &PgPool,
mural: &MuralPay,
mural: &muralpay::Client,
limit: u32,
) -> eyre::Result<()> {
#[derive(Debug)]
@@ -369,9 +360,13 @@ pub async fn sync_pending_payouts_from_mural(
/// Queries Mural for canceled or failed payouts, and updates the corresponding
/// Labrinth payouts' statuses.
///
/// This will update:
/// - Mural payout requests which are failed or canceled
/// - Mural payout requests where all of the payouts are failed or canceled
pub async fn sync_failed_mural_payouts_to_labrinth(
db: &PgPool,
mural: &MuralPay,
mural: &muralpay::Client,
limit: u32,
) -> eyre::Result<()> {
info!("Syncing failed Mural payouts to Labrinth");
@@ -380,12 +375,7 @@ pub async fn sync_failed_mural_payouts_to_labrinth(
loop {
let search_resp = mural
.search_payout_requests(
Some(muralpay::PayoutStatusFilter::PayoutStatus {
statuses: vec![
muralpay::PayoutStatus::Canceled,
muralpay::PayoutStatus::Failed,
],
}),
None,
Some(muralpay::SearchParams {
limit: Some(u64::from(limit)),
next_id,
@@ -395,48 +385,51 @@ pub async fn sync_failed_mural_payouts_to_labrinth(
.wrap_internal_err(
"failed to fetch failed payout requests from Mural",
)?;
next_id = search_resp.next_id;
if search_resp.results.is_empty() {
break;
}
let num_canceled = search_resp
.results
.iter()
.filter(|p| p.status == muralpay::PayoutStatus::Canceled)
.count();
let num_failed = search_resp
.results
.iter()
.filter(|p| p.status == muralpay::PayoutStatus::Failed)
.count();
info!(
"Found {num_canceled} canceled and {num_failed} failed Mural payouts"
);
next_id = search_resp.next_id;
let mut payout_platform_ids = Vec::<String>::new();
let mut payout_new_statuses = Vec::<String>::new();
for payout_req in search_resp.results {
let new_payout_status = match payout_req.status {
muralpay::PayoutStatus::Canceled => PayoutStatus::Cancelled,
muralpay::PayoutStatus::Failed => PayoutStatus::Failed,
_ => {
warn!(
"Found payout {} with status {:?}, which should have been filtered out by our Mural request - Mural bug",
payout_req.id, payout_req.status
for payout_request in search_resp.results {
let payout_platform_id = payout_request.id;
let new_payout_status = match payout_request.status {
muralpay::PayoutStatus::Canceled => {
trace!(
"- Payout request {payout_platform_id} set to {} because it is cancelled in Mural",
PayoutStatus::Cancelled
);
continue;
Some(PayoutStatus::Cancelled)
}
muralpay::PayoutStatus::Failed => {
trace!(
"- Payout request {payout_platform_id} set to {} because it is failed in Mural",
PayoutStatus::Failed
);
Some(PayoutStatus::Failed)
}
// this will also fail any payout request which has no payouts
_ if payout_request
.payouts
.iter()
.all(payout_should_be_failed) =>
{
trace!(
"- Payout request {payout_platform_id} set to {} because all of its payouts are failed",
PayoutStatus::Failed
);
Some(PayoutStatus::Failed)
}
_ => None,
};
let payout_platform_id = payout_req.id;
trace!(
"- Payout {payout_platform_id} set to {new_payout_status:?}",
);
payout_platform_ids.push(payout_platform_id.to_string());
payout_new_statuses.push(new_payout_status.to_string());
if let Some(new_payout_status) = new_payout_status {
payout_platform_ids.push(payout_platform_id.to_string());
payout_new_statuses.push(new_payout_status.to_string());
}
}
let result = sqlx::query!(
@@ -470,6 +463,17 @@ pub async fn sync_failed_mural_payouts_to_labrinth(
Ok(())
}
fn payout_should_be_failed(payout: &muralpay::Payout) -> bool {
matches!(
payout.details,
muralpay::PayoutDetails::Fiat(muralpay::FiatPayoutDetails {
fiat_payout_status: muralpay::FiatPayoutStatus::Failed { .. }
| muralpay::FiatPayoutStatus::Refunded { .. },
..
})
)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -477,8 +481,8 @@ mod tests {
api_v3::ApiV3,
environment::{TestEnvironment, with_test_environment},
};
use muralpay::MuralPay;
use muralpay::mock::MuralPayMock;
use muralpay::MuralPayMock;
use rust_decimal::dec;
fn create_mock_payout_request(
id: &str,
@@ -494,12 +498,51 @@ mod tests {
transaction_hash: None,
memo: None,
status,
payouts: vec![],
payouts: vec![Payout {
id: PayoutId(uuid::Uuid::new_v4()),
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
amount: TokenAmount {
token_amount: dec!(10.00),
token_symbol: "USDC".into(),
},
details: PayoutDetails::Fiat(FiatPayoutDetails {
fiat_and_rail_code: FiatAndRailCode::Usd,
fiat_payout_status: FiatPayoutStatus::Pending {
initiated_at: chrono::Utc::now(),
},
fiat_amount: FiatAmount {
fiat_amount: dec!(10.00),
fiat_currency_code: CurrencyCode::Usd,
},
transaction_fee: TokenAmount {
token_amount: dec!(1.00),
token_symbol: "USDC".into(),
},
exchange_fee_percentage: dec!(0.0),
exchange_rate: dec!(1.0),
fee_total: TokenAmount {
token_amount: dec!(1.00),
token_symbol: "USDC".into(),
},
developer_fee: None,
}),
recipient_info: PayoutRecipientInfo::Inline {
name: "John Smith".into(),
details: InlineRecipientDetails::Fiat {
details: InlineFiatRecipientDetails {
fiat_currency_code: CurrencyCode::Usd,
bank_name: "Foo Bank".into(),
truncated_bank_account_number: "1234".into(),
},
},
},
}],
}
}
fn create_mock_muralpay() -> MuralPay {
MuralPay::from_mock(MuralPayMock {
fn create_mock_muralpay() -> muralpay::Client {
muralpay::Client::from_mock(MuralPayMock {
get_payout_request: Box::new(|_id| {
Err(muralpay::MuralError::Api(muralpay::ApiError {
error_instance_id: uuid::Uuid::new_v4(),
@@ -643,7 +686,7 @@ mod tests {
})
});
let mock_client = MuralPay::from_mock(mock);
let mock_client = muralpay::Client::from_mock(mock);
// Run the function
let result =
@@ -756,7 +799,7 @@ mod tests {
})
});
let mock_client = MuralPay::from_mock(mock);
let mock_client = muralpay::Client::from_mock(mock);
// Run the function
let result =
@@ -818,7 +861,7 @@ mod tests {
})
});
let mock_client = MuralPay::from_mock(mock);
let mock_client = muralpay::Client::from_mock(mock);
// Run the function - should handle this gracefully
sync_failed_mural_payouts_to_labrinth(

View File

@@ -28,7 +28,7 @@ use rust_decimal::{Decimal, RoundingStrategy};
use serde::{Deserialize, Serialize};
use serde_json::json;
use sha2::Sha256;
use sqlx::PgPool;
use sqlx::{PgPool, PgTransaction};
use std::collections::HashMap;
use tokio_stream::StreamExt;
use tracing::error;
@@ -623,29 +623,22 @@ pub async fn create_payout(
total_fee: fees.total_fee(),
sent_to_method,
payouts_queue: &payouts_queue,
db: PgPool::clone(&pool),
transaction,
};
let payout_item = match &body.method {
match &body.method {
PayoutMethodRequest::PayPal | PayoutMethodRequest::Venmo => {
paypal_payout(payout_cx).await?
paypal_payout(payout_cx).await?;
}
PayoutMethodRequest::Tremendous { method_details } => {
tremendous_payout(payout_cx, method_details).await?
tremendous_payout(payout_cx, method_details).await?;
}
PayoutMethodRequest::MuralPay { method_details } => {
mural_pay_payout(payout_cx, method_details, &gotenberg).await?
mural_pay_payout(payout_cx, method_details, &gotenberg).await?;
}
};
}
payout_item
.insert(&mut transaction)
.await
.wrap_internal_err("failed to insert payout")?;
transaction
.commit()
.await
.wrap_internal_err("failed to commit transaction")?;
crate::database::models::DBUser::clear_caches(&[(user.id, None)], &redis)
.await
.wrap_internal_err("failed to clear user caches")?;
@@ -653,7 +646,6 @@ pub async fn create_payout(
Ok(())
}
#[derive(Clone, Copy)]
struct PayoutContext<'a> {
body: &'a Withdrawal,
user: &'a DBUser,
@@ -666,6 +658,8 @@ struct PayoutContext<'a> {
total_fee: Decimal2dp,
sent_to_method: Decimal2dp,
payouts_queue: &'a PayoutsQueue,
db: PgPool,
transaction: PgTransaction<'a>,
}
fn get_verified_email(user: &DBUser) -> Result<&str, ApiError> {
@@ -692,12 +686,14 @@ async fn tremendous_payout(
total_fee,
sent_to_method,
payouts_queue,
db: _,
mut transaction,
}: PayoutContext<'_>,
TremendousDetails {
delivery_email,
currency,
}: &TremendousDetails,
) -> Result<DBPayout, ApiError> {
) -> Result<(), ApiError> {
let user_email = get_verified_email(user)?;
#[derive(Deserialize)]
@@ -773,7 +769,7 @@ async fn tremendous_payout(
let platform_id = res.order.rewards.first().map(|reward| reward.id.clone());
Ok(DBPayout {
DBPayout {
id: payout_id,
user_id: user.id,
created: Utc::now(),
@@ -784,7 +780,17 @@ async fn tremendous_payout(
method_id: Some(body.method_id.clone()),
method_address: Some(user_email.to_string()),
platform_id,
})
}
.insert(&mut transaction)
.await
.wrap_internal_err("failed to insert payout")?;
transaction
.commit()
.await
.wrap_internal_err("failed to commit transaction")?;
Ok(())
}
async fn mural_pay_payout(
@@ -798,12 +804,35 @@ async fn mural_pay_payout(
total_fee,
sent_to_method: _,
payouts_queue,
db,
mut transaction,
}: PayoutContext<'_>,
details: &MuralPayDetails,
gotenberg: &GotenbergClient,
) -> Result<DBPayout, ApiError> {
) -> Result<(), ApiError> {
let user_email = get_verified_email(user)?;
let method_id = match &details.payout_details {
MuralPayoutRequest::Blockchain { .. } => {
"blockchain-usdc-polygon".to_string()
}
MuralPayoutRequest::Fiat {
fiat_and_rail_details,
..
} => fiat_and_rail_details.code().to_string(),
};
// Once the Mural payout request has been created successfully,
// then we *must* commit the payout into the DB,
// to link the Mural payout request to the `payout` row.
// Even if we can't execute the payout.
// For this, we immediately insert and commit the txn.
// Otherwise if we don't put it into the DB, we've got a ghost Mural
// payout with no related database entry.
//
// However, this doesn't mean that the payout will definitely go through.
// For this, we need to execute it, and handle errors.
let payout_request = payouts_queue
.create_muralpay_payout_request(
payout_id,
@@ -816,22 +845,13 @@ async fn mural_pay_payout(
)
.await?;
let method_id = match &details.payout_details {
MuralPayoutRequest::Blockchain { .. } => {
"blockchain-usdc-polygon".to_string()
}
MuralPayoutRequest::Fiat {
fiat_and_rail_details,
..
} => fiat_and_rail_details.code().to_string(),
};
Ok(DBPayout {
let payout = DBPayout {
id: payout_id,
user_id: user.id,
created: Utc::now(),
// after the payout has been successfully executed,
// we wait for Mural's confirmation that the funds have been delivered
// done in `SyncPayoutStatuses` background task
status: PayoutStatus::InTransit,
amount: amount_minus_fee.get(),
fee: Some(total_fee.get()),
@@ -839,7 +859,61 @@ async fn mural_pay_payout(
method_id: Some(method_id),
method_address: Some(user_email.to_string()),
platform_id: Some(payout_request.id.to_string()),
})
};
payout
.insert(&mut transaction)
.await
.wrap_internal_err("failed to insert payout")?;
transaction
.commit()
.await
.wrap_internal_err("failed to commit payout insert transaction")?;
// try to immediately execute the payout request...
// use a poor man's try/catch block using this `async move {}`
// to catch any errors within this block
let result = async move {
payouts_queue
.execute_mural_payout_request(payout_request.id)
.await
.wrap_internal_err("failed to execute payout request")?;
eyre::Ok(())
}
.await;
// and if it fails, make sure to immediately cancel it -
// we don't want floating payout requests
if let Err(err) = result {
if let Err(err) = sqlx::query!(
"
UPDATE payouts
SET status = $1
WHERE id = $2
",
PayoutStatus::Failed.as_str(),
payout.id as _,
)
.execute(&db)
.await
{
error!(
"Created a Mural payout request, but failed to execute it, \
and failed to mark the payout as failed: {err:#?}"
);
}
payouts_queue
.cancel_mural_payout_request(payout_request.id)
.await
.wrap_internal_err_with(|| {
eyre!("failed to cancel unexecuted payout request\noriginal error: {err:#?}")
})?;
return Err(ApiError::Internal(err));
}
Ok(())
}
async fn paypal_payout(
@@ -853,8 +927,10 @@ async fn paypal_payout(
total_fee,
sent_to_method,
payouts_queue,
db: _,
mut transaction,
}: PayoutContext<'_>,
) -> Result<DBPayout, ApiError> {
) -> Result<(), ApiError> {
let (wallet, wallet_type, address, display_address) =
if matches!(body.method, PayoutMethodRequest::Venmo) {
if let Some(venmo) = &user.venmo_handle {
@@ -965,7 +1041,7 @@ async fn paypal_payout(
let platform_id = Some(data.payout_item_id.clone());
Ok(DBPayout {
DBPayout {
id: payout_id,
user_id: user.id,
created: Utc::now(),
@@ -976,7 +1052,17 @@ async fn paypal_payout(
method_id: Some(body.method_id.clone()),
method_address: Some(display_address.clone()),
platform_id,
})
}
.insert(&mut transaction)
.await
.wrap_internal_err("failed to insert payout")?;
transaction
.commit()
.await
.wrap_internal_err("failed to commit transaction")?;
Ok(())
}
/// User performing a payout-related action.
@@ -1201,7 +1287,7 @@ pub async fn cancel_payout(
.parse::<muralpay::PayoutRequestId>()
.wrap_request_err("invalid payout request ID")?;
payouts
.cancel_muralpay_payout_request(payout_request_id)
.cancel_mural_payout_request(payout_request_id)
.await
.wrap_internal_err(
"failed to cancel payout request",