diff --git a/apps/labrinth/.sqlx/query-9152c0d7e7f508491b601c16c6eed05e2333475e96007180acda6086ee2825c0.json b/apps/labrinth/.sqlx/query-9152c0d7e7f508491b601c16c6eed05e2333475e96007180acda6086ee2825c0.json new file mode 100644 index 00000000..fe4b6260 --- /dev/null +++ b/apps/labrinth/.sqlx/query-9152c0d7e7f508491b601c16c6eed05e2333475e96007180acda6086ee2825c0.json @@ -0,0 +1,38 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n WIDTH_BUCKET(\n EXTRACT(EPOCH FROM usa.created_at)::bigint,\n EXTRACT(EPOCH FROM $1::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n EXTRACT(EPOCH FROM $2::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n $3::integer\n ) AS bucket,\n CASE WHEN $5 THEN affiliate_code ELSE 0 END AS affiliate_code,\n COUNT(*) AS conversions\n FROM users_subscriptions_affiliations usa\n INNER JOIN affiliate_codes ac ON ac.id = usa.affiliate_code\n INNER JOIN users_subscriptions us ON us.id = usa.subscription_id\n INNER JOIN charges c ON c.subscription_id = us.id\n WHERE\n ac.affiliate = $4\n AND usa.created_at BETWEEN $1 AND $2\n AND c.status = 'succeeded'\n GROUP BY bucket, affiliate_code", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "bucket", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "affiliate_code", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "conversions", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Timestamptz", + "Int4", + "Int8", + "Bool" + ] + }, + "nullable": [ + null, + null, + null + ] + }, + "hash": "9152c0d7e7f508491b601c16c6eed05e2333475e96007180acda6086ee2825c0" +} diff --git a/apps/labrinth/.sqlx/query-b3b3f8dd54cbec783ad4b872352117ad520507e2230482454e3caabbb153e482.json b/apps/labrinth/.sqlx/query-b3b3f8dd54cbec783ad4b872352117ad520507e2230482454e3caabbb153e482.json new file mode 100644 index 00000000..f18daddd --- /dev/null +++ b/apps/labrinth/.sqlx/query-b3b3f8dd54cbec783ad4b872352117ad520507e2230482454e3caabbb153e482.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT 1 AS exists FROM affiliate_codes WHERE id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "exists", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + null + ] + }, + "hash": "b3b3f8dd54cbec783ad4b872352117ad520507e2230482454e3caabbb153e482" +} diff --git a/apps/labrinth/.sqlx/query-b617ed1011341416c1c012c00e716a59873a8204e1b122c7c517a1c4437edfb4.json b/apps/labrinth/.sqlx/query-b617ed1011341416c1c012c00e716a59873a8204e1b122c7c517a1c4437edfb4.json new file mode 100644 index 00000000..eb40f08d --- /dev/null +++ b/apps/labrinth/.sqlx/query-b617ed1011341416c1c012c00e716a59873a8204e1b122c7c517a1c4437edfb4.json @@ -0,0 +1,38 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n WIDTH_BUCKET(\n EXTRACT(EPOCH FROM created)::bigint,\n EXTRACT(EPOCH FROM $1::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n EXTRACT(EPOCH FROM $2::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n $3::integer\n ) AS bucket,\n CASE WHEN $5 THEN mod_id ELSE 0 END AS mod_id,\n SUM(amount) amount_sum\n FROM payouts_values\n WHERE\n user_id = $4\n -- only project revenue is counted here\n -- for affiliate code revenue, see `affiliate_code_revenue``\n AND payouts_values.mod_id IS NOT NULL\n AND created BETWEEN $1 AND $2\n GROUP BY bucket, mod_id", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "bucket", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "mod_id", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "amount_sum", + "type_info": "Numeric" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Timestamptz", + "Int4", + "Int8", + "Bool" + ] + }, + "nullable": [ + null, + null, + null + ] + }, + "hash": "b617ed1011341416c1c012c00e716a59873a8204e1b122c7c517a1c4437edfb4" +} diff --git a/apps/labrinth/.sqlx/query-82b4d6e555dd727d31cca036b923611289b509ade9e1996d711598cd14c7f8fa.json b/apps/labrinth/.sqlx/query-eeea6cad39d645d3f5a0a4115c8350e08b7850a09a86c62d0de371a1caed7c07.json similarity index 63% rename from apps/labrinth/.sqlx/query-82b4d6e555dd727d31cca036b923611289b509ade9e1996d711598cd14c7f8fa.json rename to apps/labrinth/.sqlx/query-eeea6cad39d645d3f5a0a4115c8350e08b7850a09a86c62d0de371a1caed7c07.json index 607f4aa3..41ee8d25 100644 --- a/apps/labrinth/.sqlx/query-82b4d6e555dd727d31cca036b923611289b509ade9e1996d711598cd14c7f8fa.json +++ b/apps/labrinth/.sqlx/query-eeea6cad39d645d3f5a0a4115c8350e08b7850a09a86c62d0de371a1caed7c07.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT\n WIDTH_BUCKET(\n EXTRACT(EPOCH FROM created)::bigint,\n EXTRACT(EPOCH FROM $1::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n EXTRACT(EPOCH FROM $2::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n $3::integer\n ) AS bucket,\n COALESCE(mod_id, 0) AS mod_id,\n SUM(amount) amount_sum\n FROM payouts_values\n WHERE\n user_id = $4\n AND created BETWEEN $1 AND $2\n GROUP BY bucket, mod_id", + "query": "SELECT\n WIDTH_BUCKET(\n EXTRACT(EPOCH FROM created)::bigint,\n EXTRACT(EPOCH FROM $1::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n EXTRACT(EPOCH FROM $2::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n $3::integer\n ) AS bucket,\n CASE WHEN $5 THEN affiliate_code_source ELSE 0 END AS affiliate_code_source,\n SUM(amount) amount_sum\n FROM payouts_values\n WHERE\n user_id = $4\n AND payouts_values.affiliate_code_source IS NOT NULL\n AND created BETWEEN $1 AND $2\n GROUP BY bucket, affiliate_code_source", "describe": { "columns": [ { @@ -10,7 +10,7 @@ }, { "ordinal": 1, - "name": "mod_id", + "name": "affiliate_code_source", "type_info": "Int8" }, { @@ -24,7 +24,8 @@ "Timestamptz", "Timestamptz", "Int4", - "Int8" + "Int8", + "Bool" ] }, "nullable": [ @@ -33,5 +34,5 @@ null ] }, - "hash": "82b4d6e555dd727d31cca036b923611289b509ade9e1996d711598cd14c7f8fa" + "hash": "eeea6cad39d645d3f5a0a4115c8350e08b7850a09a86c62d0de371a1caed7c07" } diff --git a/apps/labrinth/Cargo.toml b/apps/labrinth/Cargo.toml index 3994118c..11907fd8 100644 --- a/apps/labrinth/Cargo.toml +++ b/apps/labrinth/Cargo.toml @@ -123,7 +123,7 @@ tracing = { workspace = true } tracing-actix-web = { workspace = true } url = { workspace = true } urlencoding = { workspace = true } -utoipa = { workspace = true } +utoipa = { workspace = true, features = ["url"] } utoipa-actix-web = { workspace = true } utoipa-swagger-ui = { workspace = true } uuid = { workspace = true, features = ["fast-rng", "serde", "v4"] } diff --git a/apps/labrinth/src/clickhouse/mod.rs b/apps/labrinth/src/clickhouse/mod.rs index 6efe1eaa..2c3fc6da 100644 --- a/apps/labrinth/src/clickhouse/mod.rs +++ b/apps/labrinth/src/clickhouse/mod.rs @@ -135,5 +135,30 @@ pub async fn init_client_with_database( .execute() .await?; + client + .query(&format!( + " + CREATE TABLE IF NOT EXISTS {database}.affiliate_code_clicks {cluster_line} + ( + recorded DateTime64(4), + domain String, + + user_id UInt64, + affiliate_code_id UInt64, + + ip IPv6, + country String, + user_agent String, + headers Array(Tuple(String, String)) + ) + ENGINE = {engine} + {ttl} + PRIMARY KEY (affiliate_code_id, recorded) + SETTINGS index_granularity = 8192 + " + )) + .execute() + .await?; + Ok(client.with_database(database)) } diff --git a/apps/labrinth/src/models/v3/affiliate_code.rs b/apps/labrinth/src/models/v3/affiliate_code.rs index dfe9150f..44d1c667 100644 --- a/apps/labrinth/src/models/v3/affiliate_code.rs +++ b/apps/labrinth/src/models/v3/affiliate_code.rs @@ -4,42 +4,6 @@ use serde::{Deserialize, Serialize}; use crate::models::ids::AffiliateCodeId; -/// Affiliate code used to track referral purchases. -/// -/// See [`AffiliateCode`]. -/// -/// This struct contains information which should only be visible to admins. -#[derive(Serialize, Deserialize)] -pub struct AdminAffiliateCode { - /// Affiliate code ID. - pub id: AffiliateCodeId, - /// When the code was created. - pub created_at: DateTime, - /// User who created the code. - pub created_by: UserId, - /// User who refers the purchaser. - pub affiliate: UserId, - /// Affiliate-defined name for this affiliate code - where the click came - /// from. - pub source_name: String, -} - -impl From - for AdminAffiliateCode -{ - fn from( - data: crate::database::models::affiliate_code_item::DBAffiliateCode, - ) -> Self { - Self { - id: data.id.into(), - created_at: data.created_at, - created_by: data.created_by.into(), - affiliate: data.affiliate.into(), - source_name: data.source_name, - } - } -} - /// Affiliate code used to track referral purchases. /// /// When a user follows a URL with [`AffiliateCode::id`] as an affiliate @@ -49,10 +13,14 @@ impl From /// /// This struct contains information which is allowed to be seen by an /// affiliate. -#[derive(Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] pub struct AffiliateCode { /// Affiliate code ID. pub id: AffiliateCodeId, + /// When the code was created. + pub created_at: Option>, + /// User who created the code. + pub created_by: Option, /// User who refers the purchaser. pub affiliate: UserId, /// Affiliate-defined name for this affiliate code - where the click came @@ -60,14 +28,23 @@ pub struct AffiliateCode { pub source_name: String, } -impl From - for AffiliateCode -{ - fn from( +impl AffiliateCode { + pub fn from( data: crate::database::models::affiliate_code_item::DBAffiliateCode, + is_admin: bool, ) -> Self { Self { id: data.id.into(), + created_at: if is_admin { + Some(data.created_at) + } else { + None + }, + created_by: if is_admin { + Some(data.created_by.into()) + } else { + None + }, affiliate: data.affiliate.into(), source_name: data.source_name, } diff --git a/apps/labrinth/src/models/v3/analytics.rs b/apps/labrinth/src/models/v3/analytics.rs index 6296d7e0..44805f15 100644 --- a/apps/labrinth/src/models/v3/analytics.rs +++ b/apps/labrinth/src/models/v3/analytics.rs @@ -45,6 +45,21 @@ pub struct PageView { pub headers: Vec<(String, String)>, } +#[derive(Debug, Row, Serialize, Deserialize, Clone, Eq, PartialEq, Hash)] +pub struct AffiliateCodeClick { + pub recorded: i64, + pub domain: String, + + // Modrinth User ID for logged in users + pub user_id: u64, + pub affiliate_code_id: u64, + + pub ip: Ipv6Addr, + pub country: String, + pub user_agent: String, + pub headers: Vec<(String, String)>, +} + #[derive(Row, Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Hash)] pub struct Playtime { pub recorded: i64, diff --git a/apps/labrinth/src/queue/analytics.rs b/apps/labrinth/src/queue/analytics.rs index d44f8a2b..bb0373cb 100644 --- a/apps/labrinth/src/queue/analytics.rs +++ b/apps/labrinth/src/queue/analytics.rs @@ -1,6 +1,8 @@ use crate::database::models::DatabaseError; use crate::database::redis::RedisPool; -use crate::models::analytics::{Download, PageView, Playtime}; +use crate::models::analytics::{ + AffiliateCodeClick, Download, PageView, Playtime, +}; use crate::routes::ApiError; use dashmap::{DashMap, DashSet}; use redis::cmd; @@ -14,6 +16,7 @@ pub struct AnalyticsQueue { views_queue: DashMap<(u64, u64), Vec>, downloads_queue: DashMap<(u64, u64), Download>, playtime_queue: DashSet, + affiliate_code_clicks_queue: DashMap<(u64, u64), Vec>, } impl Default for AnalyticsQueue { @@ -29,6 +32,7 @@ impl AnalyticsQueue { views_queue: DashMap::with_capacity(1000), downloads_queue: DashMap::with_capacity(1000), playtime_queue: DashSet::with_capacity(1000), + affiliate_code_clicks_queue: DashMap::with_capacity(1000), } } @@ -50,6 +54,13 @@ impl AnalyticsQueue { self.playtime_queue.insert(playtime); } + pub fn add_affiliate_code_click(&self, click: AffiliateCodeClick) { + self.affiliate_code_clicks_queue + .entry((click.user_id, click.affiliate_code_id)) + .or_default() + .push(click); + } + pub async fn index( &self, client: clickhouse::Client, @@ -65,6 +76,24 @@ impl AnalyticsQueue { let playtime_queue = self.playtime_queue.clone(); self.playtime_queue.clear(); + let affiliate_code_clicks_queue = + self.affiliate_code_clicks_queue.clone(); + self.affiliate_code_clicks_queue.clear(); + + if !affiliate_code_clicks_queue.is_empty() { + let mut insert_clicks = client + .insert::("affiliate_code_clicks") + .await?; + + for (_, click_vec) in affiliate_code_clicks_queue { + for click in click_vec { + insert_clicks.write(&click).await?; + } + } + + insert_clicks.end().await?; + } + if !playtime_queue.is_empty() { let mut playtimes = client.insert::("playtime").await?; diff --git a/apps/labrinth/src/routes/internal/affiliate.rs b/apps/labrinth/src/routes/internal/affiliate.rs index c48cc5af..18355d5a 100644 --- a/apps/labrinth/src/routes/internal/affiliate.rs +++ b/apps/labrinth/src/routes/internal/affiliate.rs @@ -1,3 +1,5 @@ +use std::{collections::HashMap, net::Ipv4Addr, sync::Arc}; + use crate::{ auth::get_user_from_headers, database::{ @@ -5,38 +7,148 @@ use crate::{ redis::RedisPool, }, models::{ - ids::AffiliateCodeId, - pats::Scopes, - users::Badges, - v3::affiliate_code::{AdminAffiliateCode, AffiliateCode}, + analytics::AffiliateCodeClick, ids::AffiliateCodeId, pats::Scopes, + users::Badges, v3::affiliate_code::AffiliateCode, + }, + queue::{analytics::AnalyticsQueue, session::AuthQueue}, + routes::analytics::FILTERED_HEADERS, + util::{ + date::get_current_tenths_of_ms, env::parse_strings_from_var, + error::Context, }, - queue::session::AuthQueue, }; -use actix_web::{HttpRequest, HttpResponse, web}; +use actix_web::{HttpRequest, delete, get, patch, post, put, web}; use ariadne::ids::UserId; use chrono::Utc; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use sqlx::PgPool; +use tracing::trace; +use url::Url; use crate::routes::ApiError; -pub fn config(cfg: &mut web::ServiceConfig) { - cfg.service( - web::scope("affiliate") - .route("", web::get().to(get_all)) - .route("", web::put().to(create)) - .route("/{id}", web::get().to(get)) - .route("/{id}", web::delete().to(delete)) - .route("/{id}", web::patch().to(patch)), - ); +pub fn config(cfg: &mut utoipa_actix_web::service_config::ServiceConfig) { + cfg.service(ingest_click) + .service(get_all) + .service(create) + .service(get) + .service(delete) + .service(patch); } +#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] +pub struct IngestClick { + pub url: Url, + pub affiliate_code_id: AffiliateCodeId, +} + +#[utoipa::path] +#[post("/ingest-click")] +async fn ingest_click( + req: HttpRequest, + web::Json(ingest_click): web::Json, + pool: web::Data, + redis: web::Data, + session_queue: web::Data, + analytics_queue: web::Data>, +) -> Result<(), ApiError> { + let user = get_user_from_headers( + &req, + &**pool, + &redis, + &session_queue, + Scopes::empty(), + ) + .await + .map(|(_, user)| user) + .ok(); + let conn_info = req.connection_info().peer_addr().map(|x| x.to_string()); + + let url = ingest_click.url; + let domain = url.host_str().ok_or_else(|| { + ApiError::InvalidInput("invalid page view URL specified!".to_string()) + })?; + let url_origin = url.origin().ascii_serialization(); + + let is_valid_url_origin = + parse_strings_from_var("ANALYTICS_ALLOWED_ORIGINS") + .unwrap_or_default() + .iter() + .any(|origin| origin == "*" || url_origin == *origin); + + if !is_valid_url_origin { + return Err(ApiError::InvalidInput( + "invalid page view URL specified!".to_string(), + )); + } + + let exists = sqlx::query!( + " + SELECT 1 AS exists FROM affiliate_codes WHERE id = $1 + ", + DBAffiliateCodeId::from(ingest_click.affiliate_code_id) as _ + ) + .fetch_optional(&**pool) + .await + .wrap_internal_err("failed to check if code exists")?; + if exists.is_none() { + // don't allow enumerating affiliate codes + return Ok(()); + } + + let headers = req + .headers() + .into_iter() + .map(|(key, val)| { + ( + key.to_string().to_lowercase(), + val.to_str().unwrap_or_default().to_string(), + ) + }) + .collect::>(); + + let ip = crate::util::ip::convert_to_ip_v6( + if let Some(header) = headers.get("cf-connecting-ip") { + header + } else { + conn_info.as_deref().unwrap_or_default() + }, + ) + .unwrap_or_else(|_| Ipv4Addr::new(127, 0, 0, 1).to_ipv6_mapped()); + + let click = AffiliateCodeClick { + recorded: get_current_tenths_of_ms(), + domain: domain.to_string(), + user_id: user.map(|user| user.id.0).unwrap_or_default(), + affiliate_code_id: ingest_click.affiliate_code_id.0, + ip, + country: headers + .get("cf-ipcountry") + .map(|x| x.to_string()) + .unwrap_or_default(), + user_agent: headers.get("user-agent").cloned().unwrap_or_default(), + headers: headers + .into_iter() + .filter(|x| !FILTERED_HEADERS.contains(&&*x.0)) + .collect(), + }; + + trace!("Ingested affiliate code click {click:?}"); + analytics_queue.add_affiliate_code_click(click); + + Ok(()) +} + +#[utoipa::path( + responses((status = OK, body = inline(Vec))) +)] +#[get("")] async fn get_all( req: HttpRequest, pool: web::Data, redis: web::Data, session_queue: web::Data, -) -> Result { +) -> Result>, ApiError> { let (_, user) = get_user_from_headers( &req, &**pool, @@ -47,21 +159,24 @@ async fn get_all( .await?; if user.role.is_admin() { - let codes = DBAffiliateCode::get_all(&**pool).await?; + let codes = DBAffiliateCode::get_all(&**pool) + .await + .wrap_internal_err("failed to get all affiliate codes")?; let codes = codes .into_iter() - .map(AdminAffiliateCode::from) + .map(|code| AffiliateCode::from(code, true)) .collect::>(); - Ok(HttpResponse::Ok().json(codes)) + Ok(web::Json(codes)) } else if user.badges.contains(Badges::AFFILIATE) { let codes = DBAffiliateCode::get_by_affiliate(DBUserId::from(user.id), &**pool) - .await?; + .await + .wrap_internal_err("failed to get all affiliate codes")?; let codes = codes .into_iter() - .map(AffiliateCode::from) + .map(|code| AffiliateCode::from(code, false)) .collect::>(); - Ok(HttpResponse::Ok().json(codes)) + Ok(web::Json(codes)) } else { Err(ApiError::CustomAuthentication( "You do not have permission to view affiliate codes!".to_string(), @@ -69,19 +184,23 @@ async fn get_all( } } -#[derive(Deserialize)] -struct CreateRequest { - affiliate: Option, - source_name: String, +#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] +pub struct CreateRequest { + pub affiliate: Option, + pub source_name: String, } +#[utoipa::path( + responses((status = OK, body = inline(AffiliateCode))) +)] +#[put("")] async fn create( req: HttpRequest, pool: web::Data, redis: web::Data, session_queue: web::Data, body: web::Json, -) -> Result { +) -> Result, ApiError> { let (_, creator) = get_user_from_headers( &req, &**pool, @@ -135,24 +254,29 @@ async fn create( affiliate: affiliate_id, source_name: body.source_name.clone(), }; - code.insert(&mut *transaction).await?; + code.insert(&mut *transaction) + .await + .wrap_internal_err("failed to insert affiliate code")?; - transaction.commit().await?; + transaction + .commit() + .await + .wrap_internal_err("failed to commit transaction")?; - if is_admin { - Ok(HttpResponse::Created().json(AdminAffiliateCode::from(code))) - } else { - Ok(HttpResponse::Created().json(AffiliateCode::from(code))) - } + Ok(web::Json(AffiliateCode::from(code, is_admin))) } +#[utoipa::path( + responses((status = OK, body = inline(AffiliateCode))) +)] +#[get("/{id}")] async fn get( req: HttpRequest, path: web::Path<(AffiliateCodeId,)>, pool: web::Data, redis: web::Data, session_queue: web::Data, -) -> Result { +) -> Result, ApiError> { let (_, user) = get_user_from_headers( &req, &**pool, @@ -172,11 +296,7 @@ async fn get( let is_owner = model.affiliate == DBUserId::from(user.id); if is_admin || is_owner { - if is_admin { - Ok(HttpResponse::Ok().json(AdminAffiliateCode::from(model))) - } else { - Ok(HttpResponse::Ok().json(AffiliateCode::from(model))) - } + Ok(web::Json(AffiliateCode::from(model, is_admin))) } else { Err(ApiError::NotFound) } @@ -185,13 +305,15 @@ async fn get( } } +#[utoipa::path] +#[delete("/{id}")] async fn delete( req: HttpRequest, path: web::Path<(AffiliateCodeId,)>, pool: web::Data, redis: web::Data, session_queue: web::Data, -) -> Result { +) -> Result<(), ApiError> { let (_, user) = get_user_from_headers( &req, &**pool, @@ -214,7 +336,7 @@ async fn delete( let result = DBAffiliateCode::remove(affiliate_code_id, &**pool).await?; if result.is_some() { - Ok(HttpResponse::NoContent().finish()) + Ok(()) } else { Err(ApiError::NotFound) } @@ -226,11 +348,13 @@ async fn delete( } } -#[derive(Deserialize)] -struct PatchRequest { - source_name: String, +#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] +pub struct PatchRequest { + pub source_name: String, } +#[utoipa::path] +#[patch("/{id}")] async fn patch( req: HttpRequest, path: web::Path<(AffiliateCodeId,)>, @@ -238,7 +362,7 @@ async fn patch( redis: web::Data, session_queue: web::Data, body: web::Json, -) -> Result { +) -> Result<(), ApiError> { let (_, user) = get_user_from_headers( &req, &**pool, @@ -273,7 +397,8 @@ async fn patch( &body.source_name, &**pool, ) - .await?; + .await + .wrap_internal_err("failed to update affiliate code source name")?; - Ok(HttpResponse::NoContent().finish()) + Ok(()) } diff --git a/apps/labrinth/src/routes/internal/mod.rs b/apps/labrinth/src/routes/internal/mod.rs index f15da09f..7151f521 100644 --- a/apps/labrinth/src/routes/internal/mod.rs +++ b/apps/labrinth/src/routes/internal/mod.rs @@ -31,7 +31,6 @@ pub fn config(cfg: &mut actix_web::web::ServiceConfig) { .configure(statuses::config) .configure(medal::config) .configure(external_notifications::config) - .configure(affiliate::config) .configure(mural::config), ); } @@ -43,5 +42,10 @@ pub fn utoipa_config( utoipa_actix_web::scope("/_internal/moderation") .wrap(default_cors()) .configure(moderation::config), + ) + .service( + utoipa_actix_web::scope("/_internal/affiliate") + .wrap(default_cors()) + .configure(affiliate::config), ); } diff --git a/apps/labrinth/src/routes/v3/analytics_get.rs b/apps/labrinth/src/routes/v3/analytics_get.rs index 924bcae5..e3beec55 100644 --- a/apps/labrinth/src/routes/v3/analytics_get.rs +++ b/apps/labrinth/src/routes/v3/analytics_get.rs @@ -13,6 +13,7 @@ use std::num::NonZeroU64; use actix_web::{HttpRequest, post, web}; use chrono::{DateTime, TimeDelta, Utc}; +use eyre::eyre; use futures::StreamExt; use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; @@ -22,16 +23,20 @@ use crate::{ auth::{AuthenticationError, get_user_from_headers}, database::{ self, DBProject, - models::{DBProjectId, DBUser, DBUserId, DBVersionId}, + models::{ + DBAffiliateCode, DBAffiliateCodeId, DBProjectId, DBUser, DBUserId, + DBVersionId, + }, redis::RedisPool, }, models::{ - ids::{ProjectId, VersionId}, + ids::{AffiliateCodeId, ProjectId, VersionId}, pats::Scopes, teams::ProjectPermissions, }, queue::session::AuthQueue, routes::ApiError, + util::error::Context, }; pub fn config(cfg: &mut utoipa_actix_web::service_config::ServiceConfig) { @@ -93,7 +98,14 @@ pub struct ReturnMetrics { /// How long users have been playing a project. pub project_playtime: Option>, /// How much payout revenue a project has generated. - pub project_revenue: Option>, + pub project_revenue: Option>, + /// How many times an affiliate code has been clicked. + pub affiliate_code_clicks: Option>, + /// How many times a product has been purchased with an affiliate code. + pub affiliate_code_conversions: + Option>, + /// How much payout revenue an affiliate code has generated. + pub affiliate_code_revenue: Option>, } /// Replacement for `()` because of a `utoipa` limitation. @@ -176,6 +188,46 @@ pub enum ProjectPlaytimeField { GameVersion, } +/// Fields for [`ReturnMetrics::project_revenue`]. +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema, +)] +#[serde(rename_all = "snake_case")] +pub enum ProjectRevenueField { + /// Project ID. + ProjectId, +} + +/// Fields for [`ReturnMetrics::affiliate_code_clicks`]. +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema, +)] +#[serde(rename_all = "snake_case")] +pub enum AffiliateCodeClicksField { + /// Affiliate code ID. + AffiliateCodeId, +} + +/// Fields for [`ReturnMetrics::affiliate_code_conversions`]. +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema, +)] +#[serde(rename_all = "snake_case")] +pub enum AffiliateCodeConversionsField { + /// Affiliate code ID. + AffiliateCodeId, +} + +/// Fields for [`ReturnMetrics::affiliate_code_revenue`]. +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema, +)] +#[serde(rename_all = "snake_case")] +pub enum AffiliateCodeRevenueField { + /// Affiliate code ID. + AffiliateCodeId, +} + /// Minimum width of a [`TimeSlice`], controlled by [`TimeRange::resolution`]. pub const MIN_RESOLUTION: TimeDelta = TimeDelta::minutes(60); @@ -203,24 +255,17 @@ pub struct TimeSlice(pub Vec); pub enum AnalyticsData { /// Project metrics. Project(ProjectAnalytics), - // AffiliateCode(AffiliateCodeAnalytics), + AffiliateCode(AffiliateCodeAnalytics), } /// Project metrics. #[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] pub struct ProjectAnalytics { /// What project these metrics are for. - source_project: ProjectId, + pub source_project: ProjectId, /// Metrics collected. #[serde(flatten)] - metrics: ProjectMetrics, -} - -impl ProjectAnalytics { - /// Get the project ID for these analytics. - pub fn project_id(&self) -> &ProjectId { - &self.source_project - } + pub metrics: ProjectMetrics, } /// Project metrics of a specific kind. @@ -300,11 +345,55 @@ pub struct ProjectRevenue { revenue: Decimal, } +/// Affiliate code metrics. +#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] +pub struct AffiliateCodeAnalytics { + /// What affiliate code these metrics are for. + pub source_affiliate_code: AffiliateCodeId, + /// Metrics collected. + #[serde(flatten)] + pub metrics: AffiliateCodeMetrics, +} + +/// Affiliate code metrics of a specific kind. +/// +/// If a field is not included in [`Metrics::bucket_by`], it will be [`None`]. +#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] +#[serde(rename_all = "snake_case", tag = "metric_kind")] +pub enum AffiliateCodeMetrics { + Clicks(AffiliateCodeClicks), + Conversions(AffiliateCodeConversions), + Revenue(AffiliateCodeRevenue), +} + +/// [`ReturnMetrics::affiliate_code_clicks`]. +#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)] +pub struct AffiliateCodeClicks { + /// Total clicks for this bucket. + pub clicks: u64, +} + +/// [`ReturnMetrics::affiliate_code_conversions`]. +#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)] +pub struct AffiliateCodeConversions { + /// Total conversions for this bucket. + pub conversions: u64, +} + +/// [`ReturnMetrics::affiliate_code_revenue`]. +#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)] +pub struct AffiliateCodeRevenue { + /// Total revenue for this bucket. + pub revenue: Decimal, +} + // logic /// Clickhouse queries - separate from [`sqlx`] queries. mod query { - use crate::database::models::{DBProjectId, DBVersionId}; + use crate::database::models::{ + DBAffiliateCodeId, DBProjectId, DBVersionId, + }; use const_format::formatcp; const TIME_RANGE_START: &str = "{time_range_start: UInt64}"; @@ -346,8 +435,8 @@ mod query { -- not the possibly-zero one, -- by using `views.project_id` instead of `project_id` AND views.project_id IN {PROJECT_IDS} - GROUP BY - bucket, project_id, domain, site_path, monetized, country" + GROUP BY bucket, project_id, domain, site_path, monetized, country + " ) }; @@ -385,8 +474,7 @@ mod query { -- not the possibly-zero one, -- by using `downloads.project_id` instead of `project_id` AND downloads.project_id IN {PROJECT_IDS} - GROUP BY - bucket, project_id, domain, site_path, version_id, country" + GROUP BY bucket, project_id, domain, site_path, version_id, country" ) }; @@ -421,8 +509,34 @@ mod query { -- not the possibly-zero one, -- by using `playtime.project_id` instead of `project_id` AND playtime.project_id IN {PROJECT_IDS} - GROUP BY - bucket, project_id, version_id, loader, game_version" + GROUP BY bucket, project_id, version_id, loader, game_version" + ) + }; + + #[derive(Debug, clickhouse::Row, serde::Deserialize)] + pub struct AffiliateCodeClickRow { + pub bucket: u64, + pub affiliate_code_id: DBAffiliateCodeId, + pub clicks: u64, + } + + pub const AFFILIATE_CODE_CLICKS: &str = { + const USE_AFFILIATE_CODE_ID: &str = "{use_affiliate_code_id: Bool}"; + const AFFILIATE_CODE_IDS: &str = "{affiliate_code_ids: Array(UInt64)}"; + + formatcp!( + "SELECT + widthBucket(toUnixTimestamp(recorded), {TIME_RANGE_START}, {TIME_RANGE_END}, {TIME_SLICES}) AS bucket, + if({USE_AFFILIATE_CODE_ID}, affiliate_code_id, 0) AS affiliate_code_id, + COUNT(*) AS clicks + FROM affiliate_code_clicks + WHERE + recorded BETWEEN {TIME_RANGE_START} AND {TIME_RANGE_END} + -- make sure that the REAL affiliate code id is included, + -- not the possibly-zero one, + -- by using `affiliate_code_clicks.affiliate_code_id` instead of `project_id` + -- AND affiliate_code_clicks.affiliate_code_id IN {AFFILIATE_CODE_IDS} + GROUP BY bucket, affiliate_code_id" ) }; } @@ -486,12 +600,12 @@ pub async fn fetch_analytics( }; if num_time_slices > MAX_TIME_SLICES { - return Err(ApiError::InvalidInput(format!( + return Err(ApiError::Request(eyre!( "Resolution is too fine or range is too large - maximum of {MAX_TIME_SLICES} time slices, was {num_time_slices}" ))); } if resolution < MIN_RESOLUTION { - return Err(ApiError::InvalidInput(format!( + return Err(ApiError::Request(eyre!( "Resolution must be at least {MIN_RESOLUTION}, was {resolution}", ))); } @@ -505,11 +619,19 @@ pub async fn fetch_analytics( let project_ids = filter_allowed_project_ids(&project_ids, &user, &pool, &redis).await?; + let affiliate_code_ids = + DBAffiliateCode::get_by_affiliate(user.id.into(), &**pool) + .await? + .into_iter() + .map(|code| code.id) + .collect::>(); + let mut query_clickhouse_cx = QueryClickhouseContext { clickhouse: &clickhouse, req: &req, time_slices: &mut time_slices, project_ids: &project_ids, + affiliate_code_ids: &affiliate_code_ids, }; if let Some(metrics) = &req.return_metrics.project_views { @@ -617,7 +739,30 @@ pub async fn fetch_analytics( .await?; } - if req.return_metrics.project_revenue.is_some() { + if let Some(metrics) = &req.return_metrics.affiliate_code_clicks { + use AffiliateCodeClicksField as F; + let uses = |field| metrics.bucket_by.contains(&field); + + tracing::info!("affiliate codes = {affiliate_code_ids:?}"); + + query_clickhouse::( + &mut query_clickhouse_cx, + query::AFFILIATE_CODE_CLICKS, + &[("use_affiliate_code_id", uses(F::AffiliateCodeId))], + |row| row.bucket, + |row| { + AnalyticsData::AffiliateCode(AffiliateCodeAnalytics { + source_affiliate_code: row.affiliate_code_id.into(), + metrics: AffiliateCodeMetrics::Clicks( + AffiliateCodeClicks { clicks: row.clicks }, + ), + }) + }, + ) + .await?; + } + + if let Some(metrics) = &req.return_metrics.project_revenue { if !scopes.contains(Scopes::PAYOUTS_READ) { return Err(AuthenticationError::InvalidCredentials.into()); } @@ -630,29 +775,29 @@ pub async fn fetch_analytics( EXTRACT(EPOCH FROM $2::timestamp with time zone AT TIME ZONE 'UTC')::bigint, $3::integer ) AS bucket, - COALESCE(mod_id, 0) AS mod_id, + CASE WHEN $5 THEN mod_id ELSE 0 END AS mod_id, SUM(amount) amount_sum FROM payouts_values WHERE user_id = $4 + -- only project revenue is counted here + -- for affiliate code revenue, see `affiliate_code_revenue`` + AND payouts_values.mod_id IS NOT NULL AND created BETWEEN $1 AND $2 GROUP BY bucket, mod_id", req.time_range.start, req.time_range.end, num_time_slices as i64, DBUserId::from(user.id) as DBUserId, + metrics.bucket_by.contains(&ProjectRevenueField::ProjectId), ) .fetch(&**pool); while let Some(row) = rows.next().await.transpose()? { - let bucket = row.bucket.ok_or_else(|| { - ApiError::InvalidInput( - "bucket should be non-null - query bug!".into(), - ) - })?; - let bucket = usize::try_from(bucket).map_err(|_| { - ApiError::InvalidInput( - "bucket value {bucket} does not fit into `usize` - query bug!".into(), - ) + let bucket = row + .bucket + .wrap_internal_err("bucket should be non-null - query bug!")?; + let bucket = usize::try_from(bucket).wrap_internal_err_with(|| { + eyre!("bucket value {bucket} does not fit into `usize` - query bug!") })?; if let Some(source_project) = @@ -673,6 +818,116 @@ pub async fn fetch_analytics( } } + if let Some(metrics) = &req.return_metrics.affiliate_code_conversions { + let mut rows = sqlx::query!( + "SELECT + WIDTH_BUCKET( + EXTRACT(EPOCH FROM usa.created_at)::bigint, + EXTRACT(EPOCH FROM $1::timestamp with time zone AT TIME ZONE 'UTC')::bigint, + EXTRACT(EPOCH FROM $2::timestamp with time zone AT TIME ZONE 'UTC')::bigint, + $3::integer + ) AS bucket, + CASE WHEN $5 THEN affiliate_code ELSE 0 END AS affiliate_code, + COUNT(*) AS conversions + FROM users_subscriptions_affiliations usa + INNER JOIN affiliate_codes ac ON ac.id = usa.affiliate_code + INNER JOIN users_subscriptions us ON us.id = usa.subscription_id + INNER JOIN charges c ON c.subscription_id = us.id + WHERE + ac.affiliate = $4 + AND usa.created_at BETWEEN $1 AND $2 + AND c.status = 'succeeded' + GROUP BY bucket, affiliate_code", + req.time_range.start, + req.time_range.end, + num_time_slices as i64, + DBUserId::from(user.id) as DBUserId, + metrics.bucket_by.contains(&AffiliateCodeConversionsField::AffiliateCodeId), + ) + .fetch(&**pool); + while let Some(row) = rows.next().await.transpose()? { + let bucket = row + .bucket + .wrap_internal_err("bucket should be non-null - query bug!")?; + let bucket = usize::try_from(bucket).wrap_internal_err_with(|| { + eyre!("bucket value {bucket} does not fit into `usize` - query bug!") + })?; + + let source_affiliate_code = AffiliateCodeId::from( + DBAffiliateCodeId(row.affiliate_code.unwrap_or_default()), + ); + let conversions = + u64::try_from(row.conversions.unwrap_or_default()) + .unwrap_or(u64::MAX); + + add_to_time_slice( + &mut time_slices, + bucket, + AnalyticsData::AffiliateCode(AffiliateCodeAnalytics { + source_affiliate_code, + metrics: AffiliateCodeMetrics::Conversions( + AffiliateCodeConversions { conversions }, + ), + }), + )?; + } + } + + if let Some(metrics) = &req.return_metrics.affiliate_code_revenue { + if !scopes.contains(Scopes::PAYOUTS_READ) { + return Err(AuthenticationError::InvalidCredentials.into()); + } + + let mut rows = sqlx::query!( + "SELECT + WIDTH_BUCKET( + EXTRACT(EPOCH FROM created)::bigint, + EXTRACT(EPOCH FROM $1::timestamp with time zone AT TIME ZONE 'UTC')::bigint, + EXTRACT(EPOCH FROM $2::timestamp with time zone AT TIME ZONE 'UTC')::bigint, + $3::integer + ) AS bucket, + CASE WHEN $5 THEN affiliate_code_source ELSE 0 END AS affiliate_code_source, + SUM(amount) amount_sum + FROM payouts_values + WHERE + user_id = $4 + AND payouts_values.affiliate_code_source IS NOT NULL + AND created BETWEEN $1 AND $2 + GROUP BY bucket, affiliate_code_source", + req.time_range.start, + req.time_range.end, + num_time_slices as i64, + DBUserId::from(user.id) as DBUserId, + metrics.bucket_by.contains(&AffiliateCodeRevenueField::AffiliateCodeId), + ) + .fetch(&**pool); + while let Some(row) = rows.next().await.transpose()? { + let bucket = row + .bucket + .wrap_internal_err("bucket should be non-null - query bug!")?; + let bucket = usize::try_from(bucket).wrap_internal_err_with(|| { + eyre!("bucket value {bucket} does not fit into `usize` - query bug!") + })?; + + let source_affiliate_code = + AffiliateCodeId::from(DBAffiliateCodeId( + row.affiliate_code_source.unwrap_or_default(), + )); + let revenue = row.amount_sum.unwrap_or_default(); + + add_to_time_slice( + &mut time_slices, + bucket, + AnalyticsData::AffiliateCode(AffiliateCodeAnalytics { + source_affiliate_code, + metrics: AffiliateCodeMetrics::Revenue( + AffiliateCodeRevenue { revenue }, + ), + }), + )?; + } + } + Ok(web::Json(FetchResponse(time_slices))) } @@ -698,6 +953,7 @@ struct QueryClickhouseContext<'a> { req: &'a GetRequest, time_slices: &'a mut [TimeSlice], project_ids: &'a [DBProjectId], + affiliate_code_ids: &'a [DBAffiliateCodeId], } async fn query_clickhouse( @@ -717,7 +973,8 @@ where .param("time_range_start", cx.req.time_range.start.timestamp()) .param("time_range_end", cx.req.time_range.end.timestamp()) .param("time_slices", cx.time_slices.len()) - .param("project_ids", cx.project_ids); + .param("project_ids", cx.project_ids) + .param("affiliate_code_ids", cx.affiliate_code_ids); for (param_name, used) in use_columns { query = query.param(param_name, used) }