Task to retroactively update Mural statuses (#4769)

* Task to retroactively update Mural statuses

* cargo sqlx prepare

* wip: add tests

* Prepare

* Fix up test

* start on muralpay mock

* Move mocking to muralpay crate
This commit is contained in:
aecsocket
2025-11-13 18:16:41 +00:00
committed by GitHub
parent 70e2138248
commit c27f787c91
24 changed files with 906 additions and 10 deletions

View File

@@ -71,16 +71,19 @@ impl Default for PayoutsQueue {
}
}
fn create_muralpay() -> Result<MuralPayConfig> {
pub fn create_muralpay_client() -> Result<MuralPay> {
let api_url = env_var("MURALPAY_API_URL")?;
let api_key = env_var("MURALPAY_API_KEY")?;
let transfer_api_key = env_var("MURALPAY_TRANSFER_API_KEY")?;
Ok(MuralPay::new(api_url, api_key, Some(transfer_api_key)))
}
pub fn create_muralpay() -> Result<MuralPayConfig> {
let client = create_muralpay_client()?;
let source_account_id = env_var("MURALPAY_SOURCE_ACCOUNT_ID")?
.parse::<muralpay::AccountId>()
.wrap_err("failed to parse source account ID")?;
let client = MuralPay::new(api_url, api_key, Some(transfer_api_key));
Ok(MuralPayConfig {
client,
source_account_id,

View File

@@ -1,12 +1,16 @@
use ariadne::ids::UserId;
use chrono::Utc;
use eyre::{Result, eyre};
use muralpay::{MuralError, TokenFeeRequest};
use futures::{StreamExt, TryFutureExt, stream::FuturesUnordered};
use muralpay::{MuralError, MuralPay, TokenFeeRequest};
use rust_decimal::{Decimal, prelude::ToPrimitive};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use tracing::warn;
use crate::{
database::models::DBPayoutId,
models::payouts::{PayoutMethodType, PayoutStatus},
queue::payouts::{AccountBalance, PayoutFees, PayoutsQueue},
routes::ApiError,
util::{
@@ -73,7 +77,7 @@ impl PayoutsQueue {
.wrap_internal_err("Mural Pay client not available")?;
let payout_details = match payout_details {
MuralPayoutRequest::Fiat {
crate::queue::payouts::mural::MuralPayoutRequest::Fiat {
bank_name,
bank_account_owner,
fiat_and_rail_details,
@@ -83,7 +87,9 @@ impl PayoutsQueue {
developer_fee: None,
fiat_and_rail_details,
},
MuralPayoutRequest::Blockchain { wallet_address } => {
crate::queue::payouts::mural::MuralPayoutRequest::Blockchain {
wallet_address,
} => {
muralpay::CreatePayoutDetails::Blockchain {
wallet_details: muralpay::WalletDetails {
// only Polygon chain is currently supported
@@ -251,3 +257,555 @@ impl PayoutsQueue {
}))
}
}
/// Finds Labrinth payouts which are not complete, fetches their corresponding
/// Mural state, and updates the payout status.
pub async fn sync_pending_payouts_from_mural(
db: &PgPool,
mural: &MuralPay,
limit: u32,
) -> eyre::Result<()> {
#[derive(Debug)]
struct UpdatePayoutOp {
payout_id: i64,
status: PayoutStatus,
}
let mut txn = db
.begin()
.await
.wrap_internal_err("failed to begin transaction")?;
let rows = sqlx::query!(
"
SELECT id, platform_id FROM payouts
WHERE
method = $1
AND status = ANY($2::text[])
LIMIT $3
",
&PayoutMethodType::MuralPay.to_string(),
&[
PayoutStatus::InTransit,
PayoutStatus::Unknown,
PayoutStatus::Cancelling
]
.iter()
.map(|s| s.to_string())
.collect::<Vec<String>>(),
i64::from(limit),
)
.fetch_all(&mut *txn)
.await
.wrap_internal_err("failed to fetch incomplete Mural payouts")?;
let futs = rows.into_iter().map(|row| async move {
let platform_id = row.platform_id.wrap_err("no platform ID")?;
let payout_request_id = platform_id.parse::<muralpay::PayoutRequestId>()
.wrap_err_with(|| eyre!("platform ID '{platform_id:?}' is not a valid payout request ID"))?;
let payout_request = mural.get_payout_request(payout_request_id).await
.wrap_err_with(|| eyre!("failed to fetch payout request {payout_request_id}"))?;
let new_payout_status = match payout_request.status {
muralpay::PayoutStatus::Canceled => Some(PayoutStatus::Cancelled),
muralpay::PayoutStatus::Executed => Some(PayoutStatus::Success),
muralpay::PayoutStatus::Failed => Some(PayoutStatus::Failed),
_ => None,
};
if let Some(status) = new_payout_status {
eyre::Ok(Some(UpdatePayoutOp {
payout_id: row.id,
status
}))
} else {
eyre::Ok(None)
}
}.map_err(move |err| eyre!(err).wrap_err(eyre!("failed to update payout with ID '{}'", row.id))));
let mut futs = futs.collect::<FuturesUnordered<_>>();
let mut payout_ids = Vec::<i64>::new();
let mut payout_statuses = Vec::<String>::new();
while let Some(result) = futs.next().await {
let op = match result {
Ok(Some(op)) => op,
Ok(None) => continue,
Err(err) => {
warn!("Failed to update payout: {err:#?}");
continue;
}
};
payout_ids.push(op.payout_id);
payout_statuses.push(op.status.to_string());
}
sqlx::query!(
"
UPDATE payouts
SET status = u.status
FROM UNNEST($1::bigint[], $2::varchar[]) AS u(id, status)
WHERE payouts.id = u.id
",
&payout_ids,
&payout_statuses,
)
.execute(&mut *txn)
.await
.wrap_internal_err("failed to update payout statuses")?;
txn.commit()
.await
.wrap_internal_err("failed to commit transaction")?;
Ok(())
}
/// Queries Mural for canceled or failed payouts, and updates the corresponding
/// Labrinth payouts' statuses.
pub async fn sync_failed_mural_payouts_to_labrinth(
db: &PgPool,
mural: &MuralPay,
limit: u32,
) -> eyre::Result<()> {
let mut next_id = None;
loop {
let search_resp = mural
.search_payout_requests(
Some(muralpay::PayoutStatusFilter::PayoutStatus {
statuses: vec![
muralpay::PayoutStatus::Canceled,
muralpay::PayoutStatus::Failed,
],
}),
Some(muralpay::SearchParams {
limit: Some(u64::from(limit)),
next_id,
}),
)
.await
.wrap_internal_err(
"failed to fetch failed payout requests from Mural",
)?;
next_id = search_resp.next_id;
if search_resp.results.is_empty() {
break;
}
let mut payout_platform_id = Vec::<String>::new();
let mut payout_new_status = Vec::<String>::new();
for payout_req in search_resp.results {
let new_payout_status = match payout_req.status {
muralpay::PayoutStatus::Canceled => PayoutStatus::Cancelled,
muralpay::PayoutStatus::Failed => PayoutStatus::Failed,
_ => {
warn!(
"Found payout {} with status {:?}, which should have been filtered out by our Mural request - Mural bug",
payout_req.id, payout_req.status
);
continue;
}
};
payout_platform_id.push(payout_req.id.to_string());
payout_new_status.push(new_payout_status.to_string());
}
sqlx::query!(
"
UPDATE payouts
SET status = u.status
FROM UNNEST($1::text[], $2::text[]) AS u(platform_id, status)
WHERE
payouts.method = $3
AND payouts.platform_id = u.platform_id
",
&payout_platform_id,
&payout_new_status,
PayoutMethodType::MuralPay.as_str(),
)
.execute(db)
.await
.wrap_internal_err("failed to update payout statuses")?;
if next_id.is_none() {
break;
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test::{
api_v3::ApiV3,
environment::{TestEnvironment, with_test_environment},
};
use muralpay::MuralPay;
use muralpay::mock::MuralPayMock;
fn create_mock_payout_request(
id: &str,
status: muralpay::PayoutStatus,
) -> muralpay::PayoutRequest {
use muralpay::*;
PayoutRequest {
id: PayoutRequestId(id.parse().unwrap()),
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
source_account_id: AccountId(uuid::Uuid::new_v4()),
transaction_hash: None,
memo: None,
status,
payouts: vec![],
}
}
fn create_mock_muralpay() -> MuralPay {
MuralPay::from_mock(MuralPayMock {
get_payout_request: Box::new(|_id| {
Err(muralpay::MuralError::Api(muralpay::ApiError {
error_instance_id: uuid::Uuid::new_v4(),
name: "Not found".to_string(),
message: "Payout request not found".to_string(),
details: vec![],
params: std::collections::HashMap::new(),
}))
}),
search_payout_requests: Box::new(|_filter, _params| {
Ok(muralpay::SearchResponse {
total: 0,
next_id: None,
results: vec![],
})
}),
..Default::default()
})
}
async fn setup_test_db_with_payouts(
db: &sqlx::PgPool,
payouts: Vec<(i64, String, PayoutStatus)>,
) -> Result<(), eyre::Error> {
for (id, platform_id, status) in payouts {
sqlx::query!(
"
INSERT INTO payouts (id, method, platform_id, status, user_id, amount, created)
VALUES ($1, $2, $3, $4, $5, 10.0, NOW())
",
id,
PayoutMethodType::MuralPay.as_str(),
platform_id,
status.as_str(),
1i64, // user_id
)
.execute(db)
.await?;
}
Ok(())
}
#[actix_rt::test]
async fn test_sync_pending_payouts_from_mural_success() {
with_test_environment(None, |env: TestEnvironment<ApiV3>| async move {
let db = &env.db.pool;
// Setup test data
let uuid1 = uuid::Uuid::new_v4().to_string();
let uuid2 = uuid::Uuid::new_v4().to_string();
let uuid3 = uuid::Uuid::new_v4().to_string();
let uuid4 = uuid::Uuid::new_v4().to_string();
setup_test_db_with_payouts(
db,
vec![
(1, uuid1.clone(), PayoutStatus::InTransit),
(2, uuid2.clone(), PayoutStatus::Unknown),
(3, uuid3.clone(), PayoutStatus::Cancelling),
(4, uuid4.clone(), PayoutStatus::InTransit), // This one won't change
],
)
.await
.unwrap();
// Verify setup
let updated_payouts = sqlx::query!(
r#"
SELECT
id,
status AS "status: PayoutStatus"
FROM payouts
ORDER BY id
"#
)
.fetch_all(db)
.await
.unwrap();
assert_eq!(updated_payouts.len(), 4);
assert_eq!(updated_payouts[0].status, PayoutStatus::InTransit);
assert_eq!(updated_payouts[1].status, PayoutStatus::Unknown);
assert_eq!(updated_payouts[2].status, PayoutStatus::Cancelling);
assert_eq!(updated_payouts[3].status, PayoutStatus::InTransit);
// Setup mock client with proper responses
let mut mock = MuralPayMock::default();
// Create mock payout requests
let payout1 = create_mock_payout_request(
&uuid1,
muralpay::PayoutStatus::Executed,
);
let payout2 = create_mock_payout_request(
&uuid2,
muralpay::PayoutStatus::Canceled,
);
let payout3 = create_mock_payout_request(
&uuid3,
muralpay::PayoutStatus::Failed,
);
let payout4 = create_mock_payout_request(
&uuid4,
muralpay::PayoutStatus::Pending,
);
// Mock get_payout_request
let payout_requests = std::collections::HashMap::from([
(uuid1.clone(), payout1.clone()),
(uuid2.clone(), payout2.clone()),
(uuid3.clone(), payout3.clone()),
(uuid4.clone(), payout4.clone()),
]);
mock.get_payout_request = Box::new(move |id| {
let id_str = id.to_string();
match payout_requests.get(&id_str) {
Some(request) => Ok(request.clone()),
None => {
Err(muralpay::MuralError::Api(muralpay::ApiError {
error_instance_id: uuid::Uuid::new_v4(),
name: "Not found".to_string(),
message: "Payout request not found".to_string(),
details: vec![],
params: std::collections::HashMap::new(),
}))
}
}
});
// Mock search_payout_requests
mock.search_payout_requests = Box::new(move |_filter, _params| {
Ok(muralpay::SearchResponse {
total: 4,
results: vec![
payout1.clone(),
payout2.clone(),
payout3.clone(),
payout4.clone(),
],
next_id: None,
})
});
let mock_client = MuralPay::from_mock(mock);
// Run the function
let result =
sync_pending_payouts_from_mural(db, &mock_client, 10).await;
assert!(result.is_ok());
// Verify results
let updated_payouts = sqlx::query!(
r#"
SELECT
id,
status AS "status: PayoutStatus"
FROM payouts
ORDER BY id
"#
)
.fetch_all(db)
.await
.unwrap();
assert_eq!(updated_payouts.len(), 4);
assert_eq!(updated_payouts[0].status, PayoutStatus::Success);
assert_eq!(updated_payouts[1].status, PayoutStatus::Cancelled);
assert_eq!(updated_payouts[2].status, PayoutStatus::Failed);
assert_eq!(updated_payouts[3].status, PayoutStatus::InTransit);
})
.await;
}
#[actix_rt::test]
async fn test_sync_pending_payouts_from_mural_handles_missing_platform_id()
{
with_test_environment(None, |env: TestEnvironment<ApiV3>| async move {
let db = &env.db.pool;
// Setup test data with null platform_id
sqlx::query!(
"
INSERT INTO payouts (id, method, platform_id, status, user_id, amount, created)
VALUES ($1, $2, NULL, $3, $4, 10.00, NOW())
",
1,
PayoutMethodType::MuralPay.as_str(),
PayoutStatus::InTransit.as_str(),
1i64, // user_id
)
.execute(db)
.await
.unwrap();
let mock_client = create_mock_muralpay();
// Run the function - should not fail even with null platform_id
sync_pending_payouts_from_mural(
db,
&mock_client,
10,
)
.await.unwrap();
}).await;
}
#[actix_rt::test]
async fn test_sync_failed_mural_payouts_to_labrinth_success() {
with_test_environment(None, |env: TestEnvironment<ApiV3>| async move {
let db = &env.db.pool;
// Setup test data
let uuid1 = uuid::Uuid::new_v4().to_string();
let uuid2 = uuid::Uuid::new_v4().to_string();
let uuid3 = uuid::Uuid::new_v4().to_string();
setup_test_db_with_payouts(
db,
vec![
(1, uuid1.clone(), PayoutStatus::InTransit), // Will be updated to cancelled
(2, uuid2.clone(), PayoutStatus::Success), // Will be updated to failed
(3, uuid3.clone(), PayoutStatus::Success), // Will remain unchanged
],
)
.await
.unwrap();
// Setup mock client
let mut mock = MuralPayMock::default();
// Create mock payout requests
let payout1 = create_mock_payout_request(
&uuid1,
muralpay::PayoutStatus::Canceled,
);
let payout2 = create_mock_payout_request(
&uuid2,
muralpay::PayoutStatus::Failed,
);
let payout3 = create_mock_payout_request(
&uuid::Uuid::new_v4().to_string(),
muralpay::PayoutStatus::Failed,
); // No matching DB record
// Mock search_payout_requests
mock.search_payout_requests = Box::new(move |_filter, _params| {
Ok(muralpay::SearchResponse {
total: 3,
results: vec![
payout1.clone(),
payout2.clone(),
payout3.clone(),
],
next_id: None,
})
});
let mock_client = MuralPay::from_mock(mock);
// Run the function
let result =
sync_failed_mural_payouts_to_labrinth(db, &mock_client, 10)
.await;
assert!(result.is_ok());
// Verify results
let updated_payouts = sqlx::query!(
r#"
SELECT
id,
status AS "status: PayoutStatus"
FROM payouts
ORDER BY id
"#
)
.fetch_all(db)
.await
.unwrap();
assert_eq!(updated_payouts.len(), 3);
assert_eq!(updated_payouts[0].status, PayoutStatus::Cancelled); // search_req_1 -> canceled
assert_eq!(updated_payouts[1].status, PayoutStatus::Failed); // search_req_2 -> failed
assert_eq!(updated_payouts[2].status, PayoutStatus::Success); // search_req_3 unchanged
})
.await;
}
#[actix_rt::test]
async fn test_sync_failed_mural_payouts_to_labrinth_handles_wrong_status() {
with_test_environment(None, |env: TestEnvironment<ApiV3>| async move {
let db = &env.db.pool;
// Setup test data
let uuid1 = uuid::Uuid::new_v4().to_string();
setup_test_db_with_payouts(
db,
vec![(1, uuid1.clone(), PayoutStatus::InTransit)],
)
.await
.unwrap();
// Setup mock client with a payout that has unexpected status
let mut mock = MuralPayMock::default();
let payout1 = create_mock_payout_request(
&uuid1,
muralpay::PayoutStatus::Pending,
); // Should be filtered out
// Mock search_payout_requests
mock.search_payout_requests = Box::new(move |_filter, _params| {
Ok(muralpay::SearchResponse {
total: 1,
results: vec![payout1.clone()],
next_id: None,
})
});
let mock_client = MuralPay::from_mock(mock);
// Run the function - should handle this gracefully
sync_failed_mural_payouts_to_labrinth(
db,
&mock_client,
10,
)
.await
.unwrap();
// Verify status remains unchanged
let payout =
sqlx::query!(r#"SELECT status AS "status: PayoutStatus" FROM payouts WHERE id = 1"#)
.fetch_one(db)
.await
.unwrap();
assert_eq!(payout.status, PayoutStatus::InTransit); // Unchanged
})
.await;
}
}