Integrate with Aditude API for payouts (#965)

* Integrate with Aditude API for payouts

* Update expiry

* Fix tests
This commit is contained in:
Geometrically
2024-09-12 14:52:17 -07:00
committed by GitHub
parent 5b5599128a
commit edb7e5f323
19 changed files with 483 additions and 240 deletions

View File

@@ -1,11 +1,10 @@
use crate::models::payouts::{
PayoutDecimal, PayoutInterval, PayoutMethod, PayoutMethodFee, PayoutMethodType,
};
use crate::models::projects::MonetizationStatus;
use crate::routes::ApiError;
use crate::util::env::parse_var;
use crate::{database::redis::RedisPool, models::projects::MonetizationStatus};
use base64::Engine;
use chrono::{DateTime, Datelike, Duration, Utc, Weekday};
use chrono::{DateTime, Datelike, Duration, TimeZone, Utc};
use dashmap::DashMap;
use futures::TryStreamExt;
use reqwest::Method;
@@ -511,11 +510,55 @@ impl PayoutsQueue {
}
}
pub async fn process_payout(
pool: &PgPool,
redis: &RedisPool,
client: &clickhouse::Client,
) -> Result<(), ApiError> {
#[derive(Deserialize)]
pub struct AditudePoints {
#[serde(rename = "pointsList")]
pub points_list: Vec<AditudePoint>,
}
#[derive(Deserialize)]
pub struct AditudePoint {
pub metric: AditudeMetric,
pub time: AditudeTime,
}
#[derive(Deserialize)]
pub struct AditudeMetric {
pub revenue: Option<Decimal>,
pub impressions: Option<u128>,
pub cpm: Option<Decimal>,
}
#[derive(Deserialize)]
pub struct AditudeTime {
pub seconds: u64,
}
pub async fn make_aditude_request(
metrics: &[&str],
range: &str,
interval: &str,
) -> Result<Vec<AditudePoints>, ApiError> {
let request = reqwest::Client::new()
.post("https://cloud.aditude.io/api/public/insights/metrics")
.bearer_auth(&dotenvy::var("ADITUDE_API_KEY")?)
.json(&serde_json::json!({
"metrics": metrics,
"range": range,
"interval": interval
}))
.send()
.await?
.error_for_status()?;
let text = request.text().await?;
let json: Vec<AditudePoints> = serde_json::from_str(&text)?;
Ok(json)
}
pub async fn process_payout(pool: &PgPool, client: &clickhouse::Client) -> Result<(), ApiError> {
let start: DateTime<Utc> = DateTime::from_naive_utc_and_offset(
(Utc::now() - Duration::days(1))
.date_naive()
@@ -707,24 +750,62 @@ pub async fn process_payout(
);
}
let amount = Decimal::from(parse_var::<u64>("PAYOUTS_BUDGET").unwrap_or(0));
let aditude_res =
make_aditude_request(&["METRIC_IMPRESSIONS", "METRIC_REVENUE"], "Yesterday", "1d").await?;
let days = Decimal::from(28);
let weekdays = Decimal::from(20);
let weekend_bonus = Decimal::from(5) / Decimal::from(4);
let aditude_amount: Decimal = aditude_res
.iter()
.map(|x| {
x.points_list
.iter()
.filter_map(|x| x.metric.revenue)
.sum::<Decimal>()
})
.sum();
let aditude_impressions: u128 = aditude_res
.iter()
.map(|x| {
x.points_list
.iter()
.filter_map(|x| x.metric.impressions)
.sum::<u128>()
})
.sum();
let weekday_amount = amount / (weekdays + (weekend_bonus) * (days - weekdays));
let weekend_amount = weekday_amount * weekend_bonus;
// Modrinth's share of ad revenue
let modrinth_cut = Decimal::from(1) / Decimal::from(4);
// Clean.io fee (ad antimalware). Per 1000 impressions.
let clean_io_fee = Decimal::from(8) / Decimal::from(1000);
let payout = match start.weekday() {
Weekday::Sat | Weekday::Sun => weekend_amount,
_ => weekday_amount,
let net_revenue =
aditude_amount - (clean_io_fee * Decimal::from(aditude_impressions) / Decimal::from(1000));
let payout = net_revenue * (Decimal::from(1) - modrinth_cut);
// Ad payouts are Net 60 from the end of the month
let available = {
let now = Utc::now().date_naive();
let year = now.year();
let month = now.month();
// Get the first day of the next month
let last_day_of_month = if month == 12 {
Utc.with_ymd_and_hms(year + 1, 1, 1, 0, 0, 0).unwrap()
} else {
Utc.with_ymd_and_hms(year, month + 1, 1, 0, 0, 0).unwrap()
};
last_day_of_month + Duration::days(59)
};
let mut clear_cache_users = Vec::new();
let (mut insert_user_ids, mut insert_project_ids, mut insert_payouts, mut insert_starts) =
(Vec::new(), Vec::new(), Vec::new(), Vec::new());
let mut update_user_balance: HashMap<i64, Decimal> = HashMap::new();
let (
mut insert_user_ids,
mut insert_project_ids,
mut insert_payouts,
mut insert_starts,
mut insert_availables,
) = (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new());
for (id, project) in projects_map {
if let Some(value) = &multipliers.values.get(&(id as u64)) {
let project_multiplier: Decimal =
@@ -741,62 +822,29 @@ pub async fn process_payout(
insert_project_ids.push(id);
insert_payouts.push(payout);
insert_starts.push(start);
*update_user_balance.entry(user_id).or_default() += payout;
clear_cache_users.push(user_id);
insert_availables.push(available);
}
}
}
}
}
let (mut update_user_ids, mut update_user_balances) = (Vec::new(), Vec::new());
for (user_id, payout) in update_user_balance {
update_user_ids.push(user_id);
update_user_balances.push(payout);
}
sqlx::query!(
"
UPDATE users u
SET balance = u.balance + v.amount
FROM unnest($1::BIGINT[], $2::NUMERIC[]) AS v(id, amount)
WHERE u.id = v.id
",
&update_user_ids,
&update_user_balances
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
INSERT INTO payouts_values (user_id, mod_id, amount, created)
SELECT * FROM UNNEST ($1::bigint[], $2::bigint[], $3::numeric[], $4::timestamptz[])
INSERT INTO payouts_values (user_id, mod_id, amount, created, date_available)
SELECT * FROM UNNEST ($1::bigint[], $2::bigint[], $3::numeric[], $4::timestamptz[], $5::timestamptz[])
",
&insert_user_ids[..],
&insert_project_ids[..],
&insert_payouts[..],
&insert_starts[..]
&insert_starts[..],
&insert_availables[..]
)
.execute(&mut *transaction)
.await?;
transaction.commit().await?;
if !clear_cache_users.is_empty() {
crate::database::models::User::clear_caches(
&clear_cache_users
.into_iter()
.map(|x| (crate::database::models::UserId(x), None))
.collect::<Vec<_>>(),
redis,
)
.await?;
}
Ok(())
}
@@ -806,17 +854,19 @@ pub async fn insert_payouts(
insert_project_ids: Vec<i64>,
insert_payouts: Vec<Decimal>,
insert_starts: Vec<DateTime<Utc>>,
insert_availables: Vec<DateTime<Utc>>,
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> sqlx::Result<PgQueryResult> {
sqlx::query!(
"
INSERT INTO payouts_values (user_id, mod_id, amount, created)
SELECT * FROM UNNEST ($1::bigint[], $2::bigint[], $3::numeric[], $4::timestamptz[])
INSERT INTO payouts_values (user_id, mod_id, amount, created, date_available)
SELECT * FROM UNNEST ($1::bigint[], $2::bigint[], $3::numeric[], $4::timestamptz[], $5::timestamptz[])
",
&insert_user_ids[..],
&insert_project_ids[..],
&insert_payouts[..],
&insert_starts[..]
&insert_starts[..],
&insert_availables[..],
)
.execute(&mut **transaction)
.await