From 9a8f3d7bade50002728f8b1f19a4c15c47467266 Mon Sep 17 00:00:00 2001 From: Geometrically <18202329+Geometrically@users.noreply.github.com> Date: Thu, 19 Oct 2023 09:42:49 -0700 Subject: [PATCH] Fix analytics routes + add revenue route (#734) --- sqlx-data.json | 35 +++++++ src/clickhouse/fetch.rs | 80 +++++++-------- src/routes/v2/analytics_get.rs | 179 ++++++++++++++++++++++----------- 3 files changed, 196 insertions(+), 98 deletions(-) diff --git a/sqlx-data.json b/sqlx-data.json index cdeab331..8b02bea3 100644 --- a/sqlx-data.json +++ b/sqlx-data.json @@ -1267,6 +1267,41 @@ }, "query": "\n DELETE FROM uploaded_images\n WHERE id = $1\n " }, + "2aca8f34773d1028fb5d4cf5d3f2ab65cc3b8cea5f94bb0e1a0f632a787d708f": { + "describe": { + "columns": [ + { + "name": "mod_id", + "ordinal": 0, + "type_info": "Int8" + }, + { + "name": "amount_sum", + "ordinal": 1, + "type_info": "Numeric" + }, + { + "name": "interval_start", + "ordinal": 2, + "type_info": "Timestamptz" + } + ], + "nullable": [ + true, + null, + null + ], + "parameters": { + "Left": [ + "Int8Array", + "Timestamptz", + "Timestamptz", + "Interval" + ] + } + }, + "query": "\n SELECT mod_id, SUM(amount) amount_sum, DATE_BIN($4::interval, created, TIMESTAMP '2001-01-01') AS interval_start\n FROM payouts_values\n WHERE mod_id = ANY($1) AND created BETWEEN $2 AND $3\n GROUP by mod_id, interval_start ORDER BY interval_start\n " + }, "2b8dafe9c3df9fd25235a13868e8e7607decfbe96a413cc576919a1fb510f269": { "describe": { "columns": [], diff --git a/src/clickhouse/fetch.rs b/src/clickhouse/fetch.rs index a17d69ff..62065563 100644 --- a/src/clickhouse/fetch.rs +++ b/src/clickhouse/fetch.rs @@ -4,12 +4,12 @@ use crate::{ models::ids::{ProjectId, VersionId}, routes::ApiError, }; -use chrono::NaiveDate; +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; #[derive(clickhouse::Row, Serialize, Deserialize, Clone, Debug)] pub struct ReturnPlaytimes { - pub time: u64, + pub time: u32, pub id: u64, pub total_seconds: u64, } @@ -24,14 +24,14 @@ pub struct ReturnCountry { #[derive(clickhouse::Row, Serialize, Deserialize, Clone, Debug)] pub struct ReturnViews { - pub time: u64, + pub time: u32, pub id: u64, pub total_views: u64, } #[derive(clickhouse::Row, Serialize, Deserialize, Clone, Debug)] pub struct ReturnDownloads { - pub time: u64, + pub time: u32, pub id: u64, pub total_downloads: u64, } @@ -41,8 +41,8 @@ pub struct ReturnDownloads { pub async fn fetch_playtimes( projects: Option>, versions: Option>, - start_date: NaiveDate, - end_date: NaiveDate, + start_date: DateTime, + end_date: DateTime, resolution_minute: u32, client: Arc, ) -> Result, ApiError> { @@ -60,21 +60,20 @@ pub async fn fetch_playtimes( .query(&format!( " SELECT - toYYYYMMDDhhmmss(toStartOfInterval(recorded, toIntervalMinute(?)) AS time), - {project_or_version}, + toUnixTimestamp(toStartOfInterval(recorded, toIntervalMinute(?))) AS time, + {project_or_version} AS id, SUM(seconds) AS total_seconds FROM playtime - WHERE time >= toDate(?) AND time <= toDate(?) + WHERE recorded BETWEEN ? AND ? AND {project_or_version} IN ? GROUP BY time, - project_id, {project_or_version} " )) .bind(resolution_minute) - .bind(start_date) - .bind(end_date); + .bind(start_date.timestamp()) + .bind(end_date.timestamp()); if let Some(projects) = projects { query = query.bind(projects.iter().map(|x| x.0).collect::>()); @@ -89,8 +88,8 @@ pub async fn fetch_playtimes( pub async fn fetch_views( projects: Option>, versions: Option>, - start_date: NaiveDate, - end_date: NaiveDate, + start_date: DateTime, + end_date: DateTime, resolution_minutes: u32, client: Arc, ) -> Result, ApiError> { @@ -108,20 +107,19 @@ pub async fn fetch_views( .query(&format!( " SELECT - toYYYYMMDDhhmmss((toStartOfInterval(recorded, toIntervalMinute(?)) AS time)), - {project_or_version}, - count(id) AS total_views + toUnixTimestamp(toStartOfInterval(recorded, toIntervalMinute(?))) AS time, + {project_or_version} AS id, + count(views.id) AS total_views FROM views - WHERE time >= toDate(?) AND time <= toDate(?) - AND {project_or_version} IN ? + WHERE recorded BETWEEN ? AND ? + AND {project_or_version} IN ? GROUP BY - time, - {project_or_version} - " + time, {project_or_version} + " )) .bind(resolution_minutes) - .bind(start_date) - .bind(end_date); + .bind(start_date.timestamp()) + .bind(end_date.timestamp()); if let Some(projects) = projects { query = query.bind(projects.iter().map(|x| x.0).collect::>()); @@ -136,8 +134,8 @@ pub async fn fetch_views( pub async fn fetch_downloads( projects: Option>, versions: Option>, - start_date: NaiveDate, - end_date: NaiveDate, + start_date: DateTime, + end_date: DateTime, resolution_minutes: u32, client: Arc, ) -> Result, ApiError> { @@ -155,20 +153,18 @@ pub async fn fetch_downloads( .query(&format!( " SELECT - toYYYYMMDDhhmmss((toStartOfInterval(recorded, toIntervalMinute(?)) AS time)), - {project_or_version}, - count(id) AS total_downloads + toUnixTimestamp(toStartOfInterval(recorded, toIntervalMinute(?))) AS time, + {project_or_version} as id, + count(downloads.id) AS total_downloads FROM downloads - WHERE time >= toDate(?) AND time <= toDate(?) - AND {project_or_version} IN ? - GROUP BY - time, - {project_or_version} - " + WHERE recorded BETWEEN ? AND ? + AND {project_or_version} IN ? + GROUP BY time, {project_or_version} + " )) .bind(resolution_minutes) - .bind(start_date) - .bind(end_date); + .bind(start_date.timestamp()) + .bind(end_date.timestamp()); if let Some(projects) = projects { query = query.bind(projects.iter().map(|x| x.0).collect::>()); @@ -183,8 +179,8 @@ pub async fn fetch_downloads( pub async fn fetch_countries( projects: Option>, versions: Option>, - start_date: NaiveDate, - end_date: NaiveDate, + start_date: DateTime, + end_date: DateTime, client: Arc, ) -> Result, ApiError> { let project_or_version = if projects.is_some() && versions.is_none() { @@ -205,7 +201,7 @@ pub async fn fetch_countries( {project_or_version}, count(id) AS total_views FROM views - WHERE toYYYYMMDDhhmmss(recorded) >= toYYYYMMDDhhmmss(toDate(?)) AND toYYYYMMDDhhmmss(recorded) <= toYYYYMMDDhhmmss(toDate(?)) + WHERE recorded BETWEEN ? AND ? GROUP BY country, {project_or_version} @@ -216,7 +212,7 @@ pub async fn fetch_countries( {project_or_version}, count(id) AS total_downloads FROM downloads - WHERE toYYYYMMDDhhmmss(recorded) >= toYYYYMMDDhhmmss(toDate(?)) AND toYYYYMMDDhhmmss(recorded) <= toYYYYMMDDhhmmss(toDate(?)) + WHERE recorded BETWEEN ? AND ? GROUP BY country, {project_or_version} @@ -231,7 +227,7 @@ pub async fn fetch_countries( LEFT JOIN download_grouping AS d ON (v.country = d.country) AND (v.{project_or_version} = d.{project_or_version}) WHERE {project_or_version} IN ? " - )).bind(start_date).bind(end_date).bind(start_date).bind(end_date); + )).bind(start_date.timestamp()).bind(end_date.timestamp()).bind(start_date.timestamp()).bind(end_date.timestamp()); if let Some(projects) = projects { query = query.bind(projects.iter().map(|x| x.0).collect::>()); diff --git a/src/routes/v2/analytics_get.rs b/src/routes/v2/analytics_get.rs index ae9d9b91..d6b61584 100644 --- a/src/routes/v2/analytics_get.rs +++ b/src/routes/v2/analytics_get.rs @@ -13,10 +13,12 @@ use crate::{ queue::session::AuthQueue, }; use actix_web::{get, web, HttpRequest, HttpResponse}; -use chrono::{Duration, NaiveDate, Utc}; +use chrono::{DateTime, Duration, Utc}; use serde::{Deserialize, Serialize}; +use sqlx::postgres::types::PgInterval; use sqlx::PgPool; use std::collections::HashMap; +use std::convert::TryInto; pub fn config(cfg: &mut web::ServiceConfig) { cfg.service( @@ -24,6 +26,7 @@ pub fn config(cfg: &mut web::ServiceConfig) { .service(playtimes_get) .service(views_get) .service(downloads_get) + .service(revenue_get) .service(countries_downloads_get) .service(countries_views_get), ); @@ -40,8 +43,8 @@ pub struct GetData { pub project_ids: Option, pub version_ids: Option, - pub start_date: Option, // defaults to 2 weeks ago - pub end_date: Option, // defaults to now + pub start_date: Option>, // defaults to 2 weeks ago + pub end_date: Option>, // defaults to now pub resolution_minutes: Option, // defaults to 1 day. Ignored in routes that do not aggregate over a resolution (eg: /countries) } @@ -72,7 +75,7 @@ pub async fn playtimes_get( pool: web::Data, redis: web::Data, ) -> Result { - let user_option = get_user_from_headers( + let user = get_user_from_headers( &req, &**pool, &redis, @@ -80,8 +83,7 @@ pub async fn playtimes_get( Some(&[Scopes::ANALYTICS]), ) .await - .map(|x| x.1) - .ok(); + .map(|x| x.1)?; let project_ids = data .project_ids @@ -100,17 +102,15 @@ pub async fn playtimes_get( )); } - let start_date = data - .start_date - .unwrap_or(Utc::now().naive_utc().date() - Duration::weeks(2)); - let end_date = data.end_date.unwrap_or(Utc::now().naive_utc().date()); + let start_date = data.start_date.unwrap_or(Utc::now() - Duration::weeks(2)); + let end_date = data.end_date.unwrap_or(Utc::now()); let resolution_minutes = data.resolution_minutes.unwrap_or(60 * 24); // Convert String list to list of ProjectIds or VersionIds // - Filter out unauthorized projects/versions // - If no project_ids or version_ids are provided, we default to all projects the user has access to let (project_ids, version_ids) = - filter_allowed_ids(project_ids, version_ids, user_option, &pool, &redis).await?; + filter_allowed_ids(project_ids, version_ids, user, &pool, &redis).await?; // Get the views let playtimes = crate::clickhouse::fetch_playtimes( @@ -130,7 +130,7 @@ pub async fn playtimes_get( hm.insert(id_string.clone(), HashMap::new()); } if let Some(hm) = hm.get_mut(&id_string) { - hm.insert(playtime.time.to_string(), playtime.total_seconds); + hm.insert(playtime.time, playtime.total_seconds); } } @@ -155,7 +155,7 @@ pub async fn views_get( pool: web::Data, redis: web::Data, ) -> Result { - let user_option = get_user_from_headers( + let user = get_user_from_headers( &req, &**pool, &redis, @@ -163,8 +163,7 @@ pub async fn views_get( Some(&[Scopes::ANALYTICS]), ) .await - .map(|x| x.1) - .ok(); + .map(|x| x.1)?; let project_ids = data .project_ids @@ -183,17 +182,15 @@ pub async fn views_get( )); } - let start_date = data - .start_date - .unwrap_or(Utc::now().naive_utc().date() - Duration::weeks(2)); - let end_date = data.end_date.unwrap_or(Utc::now().naive_utc().date()); + let start_date = data.start_date.unwrap_or(Utc::now() - Duration::weeks(2)); + let end_date = data.end_date.unwrap_or(Utc::now()); let resolution_minutes = data.resolution_minutes.unwrap_or(60 * 24); // Convert String list to list of ProjectIds or VersionIds // - Filter out unauthorized projects/versions // - If no project_ids or version_ids are provided, we default to all projects the user has access to let (project_ids, version_ids) = - filter_allowed_ids(project_ids, version_ids, user_option, &pool, &redis).await?; + filter_allowed_ids(project_ids, version_ids, user, &pool, &redis).await?; // Get the views let views = crate::clickhouse::fetch_views( @@ -213,7 +210,7 @@ pub async fn views_get( hm.insert(id_string.clone(), HashMap::new()); } if let Some(hm) = hm.get_mut(&id_string) { - hm.insert(views.time.to_string(), views.total_views); + hm.insert(views.time, views.total_views); } } @@ -246,8 +243,7 @@ pub async fn downloads_get( Some(&[Scopes::ANALYTICS]), ) .await - .map(|x| x.1) - .ok(); + .map(|x| x.1)?; let project_ids = data .project_ids @@ -266,10 +262,8 @@ pub async fn downloads_get( )); } - let start_date = data - .start_date - .unwrap_or(Utc::now().naive_utc().date() - Duration::weeks(2)); - let end_date = data.end_date.unwrap_or(Utc::now().naive_utc().date()); + let start_date = data.start_date.unwrap_or(Utc::now() - Duration::weeks(2)); + let end_date = data.end_date.unwrap_or(Utc::now()); let resolution_minutes = data.resolution_minutes.unwrap_or(60 * 24); // Convert String list to list of ProjectIds or VersionIds @@ -296,7 +290,88 @@ pub async fn downloads_get( hm.insert(id_string.clone(), HashMap::new()); } if let Some(hm) = hm.get_mut(&id_string) { - hm.insert(downloads.time.to_string(), downloads.total_downloads); + hm.insert(downloads.time, downloads.total_downloads); + } + } + + Ok(HttpResponse::Ok().json(hm)) +} + +/// Get payout data for a set of projects +/// Data is returned as a hashmap of project ids to a hashmap of days to amount earned per day +/// eg: +/// { +/// "4N1tEhnO": { +/// "20230824": 0.001 +/// } +///} +/// ONLY project IDs can be used. Unauthorized projects will be filtered out. +#[get("revenue")] +pub async fn revenue_get( + req: HttpRequest, + data: web::Query, + session_queue: web::Data, + pool: web::Data, + redis: web::Data, +) -> Result { + let user = get_user_from_headers( + &req, + &**pool, + &redis, + &session_queue, + Some(&[Scopes::PAYOUTS_READ]), + ) + .await + .map(|x| x.1)?; + + let project_ids = data + .project_ids + .as_ref() + .map(|ids| serde_json::from_str::>(ids)) + .transpose()?; + + let start_date = data.start_date.unwrap_or(Utc::now() - Duration::weeks(2)); + let end_date = data.end_date.unwrap_or(Utc::now()); + let resolution_minutes = data.resolution_minutes.unwrap_or(60 * 24); + + // Convert String list to list of ProjectIds or VersionIds + // - Filter out unauthorized projects/versions + // - If no project_ids or version_ids are provided, we default to all projects the user has access to + let (project_ids, _) = filter_allowed_ids(project_ids, None, user, &pool, &redis).await?; + + let duration: PgInterval = Duration::minutes(resolution_minutes as i64) + .try_into() + .unwrap(); + // Get the revenue data + let payouts_values = sqlx::query!( + " + SELECT mod_id, SUM(amount) amount_sum, DATE_BIN($4::interval, created, TIMESTAMP '2001-01-01') AS interval_start + FROM payouts_values + WHERE mod_id = ANY($1) AND created BETWEEN $2 AND $3 + GROUP by mod_id, interval_start ORDER BY interval_start + ", + &project_ids.unwrap_or_default().into_iter().map(|x| x.0 as i64).collect::>(), + start_date, + end_date, + duration, + ) + .fetch_all(&**pool) + .await?; + + let mut hm = HashMap::new(); + for value in payouts_values { + if let Some(mod_id) = value.mod_id { + if let Some(amount) = value.amount_sum { + if let Some(interval_start) = value.interval_start { + let id_string = to_base62(mod_id as u64); + if !hm.contains_key(&id_string) { + hm.insert(id_string.clone(), HashMap::new()); + } + if let Some(hm) = hm.get_mut(&id_string) { + hm.insert(interval_start.timestamp(), amount); + } + } + } } } @@ -324,7 +399,7 @@ pub async fn countries_downloads_get( pool: web::Data, redis: web::Data, ) -> Result { - let user_option = get_user_from_headers( + let user = get_user_from_headers( &req, &**pool, &redis, @@ -332,8 +407,7 @@ pub async fn countries_downloads_get( Some(&[Scopes::ANALYTICS]), ) .await - .map(|x| x.1) - .ok(); + .map(|x| x.1)?; let project_ids = data .project_ids @@ -352,16 +426,14 @@ pub async fn countries_downloads_get( )); } - let start_date = data - .start_date - .unwrap_or(Utc::now().naive_utc().date() - Duration::weeks(2)); - let end_date = data.end_date.unwrap_or(Utc::now().naive_utc().date()); + let start_date = data.start_date.unwrap_or(Utc::now() - Duration::weeks(2)); + let end_date = data.end_date.unwrap_or(Utc::now()); // Convert String list to list of ProjectIds or VersionIds // - Filter out unauthorized projects/versions // - If no project_ids or version_ids are provided, we default to all projects the user has access to let (project_ids, version_ids) = - filter_allowed_ids(project_ids, version_ids, user_option, &pool, &redis).await?; + filter_allowed_ids(project_ids, version_ids, user, &pool, &redis).await?; // Get the countries let countries = crate::clickhouse::fetch_countries( @@ -408,7 +480,7 @@ pub async fn countries_views_get( pool: web::Data, redis: web::Data, ) -> Result { - let user_option = get_user_from_headers( + let user = get_user_from_headers( &req, &**pool, &redis, @@ -416,8 +488,7 @@ pub async fn countries_views_get( Some(&[Scopes::ANALYTICS]), ) .await - .map(|x| x.1) - .ok(); + .map(|x| x.1)?; let project_ids = data .project_ids @@ -436,16 +507,14 @@ pub async fn countries_views_get( )); } - let start_date = data - .start_date - .unwrap_or(Utc::now().naive_utc().date() - Duration::weeks(2)); - let end_date = data.end_date.unwrap_or(Utc::now().naive_utc().date()); + let start_date = data.start_date.unwrap_or(Utc::now() - Duration::weeks(2)); + let end_date = data.end_date.unwrap_or(Utc::now()); // Convert String list to list of ProjectIds or VersionIds // - Filter out unauthorized projects/versions // - If no project_ids or version_ids are provided, we default to all projects the user has access to let (project_ids, version_ids) = - filter_allowed_ids(project_ids, version_ids, user_option, &pool, &redis).await?; + filter_allowed_ids(project_ids, version_ids, user, &pool, &redis).await?; // Get the countries let countries = crate::clickhouse::fetch_countries( @@ -474,7 +543,7 @@ pub async fn countries_views_get( async fn filter_allowed_ids( mut project_ids: Option>, version_ids: Option>, - user_option: Option, + user: crate::models::users::User, pool: &web::Data, redis: &RedisPool, ) -> Result<(Option>, Option>), ApiError> { @@ -486,15 +555,13 @@ async fn filter_allowed_ids( // If no project_ids or version_ids are provided, we default to all projects the user has access to if project_ids.is_none() && version_ids.is_none() { - if let Some(user) = &user_option { - project_ids = Some( - user_item::User::get_projects(user.id.into(), &***pool, redis) - .await? - .into_iter() - .map(|x| ProjectId::from(x).to_string()) - .collect(), - ); - } + project_ids = Some( + user_item::User::get_projects(user.id.into(), &***pool, redis) + .await? + .into_iter() + .map(|x| ProjectId::from(x).to_string()) + .collect(), + ); } // Convert String list to list of ProjectIds or VersionIds @@ -507,7 +574,7 @@ async fn filter_allowed_ids( .map(|id| Ok(ProjectId(parse_base62(id)?).into())) .collect::, ApiError>>()?; let projects = project_item::Project::get_many_ids(&ids, &***pool, redis).await?; - let ids: Vec = filter_authorized_projects(projects, &user_option, pool) + let ids: Vec = filter_authorized_projects(projects, &Some(user.clone()), pool) .await? .into_iter() .map(|x| x.id) @@ -523,7 +590,7 @@ async fn filter_allowed_ids( .map(|id| Ok(VersionId(parse_base62(id)?).into())) .collect::, ApiError>>()?; let versions = version_item::Version::get_many(&ids, &***pool, redis).await?; - let ids: Vec = filter_authorized_versions(versions, &user_option, pool) + let ids: Vec = filter_authorized_versions(versions, &Some(user), pool) .await? .into_iter() .map(|x| x.id)