diff --git a/Cargo.lock b/Cargo.lock index 877c3bb52..d6068768c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1648,6 +1648,26 @@ dependencies = [ "tiny-keccak", ] +[[package]] +name = "const_format" +version = "0.2.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7faa7469a93a566e9ccc1c73fe783b4a65c274c5ace346038dca9c39fe0030ad" +dependencies = [ + "const_format_proc_macros", +] + +[[package]] +name = "const_format_proc_macros" +version = "0.2.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d57c2eccfb16dbac1f4e61e206105db5820c9d26c3c472bc17c774259ef7744" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + [[package]] name = "constant_time_eq" version = "0.3.1" @@ -4477,6 +4497,7 @@ dependencies = [ "color-eyre", "color-thief", "console-subscriber", + "const_format", "dashmap", "deadpool-redis", "dotenv-build", diff --git a/Cargo.toml b/Cargo.toml index 5f0e7bb6b..56f0201b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -132,6 +132,7 @@ sentry = { version = "0.42.0", default-features = false, features = [ "rustls", ] } sentry-actix = "0.42.0" +const_format = "0.2.34" serde = "1.0.219" serde_bytes = "0.11.17" serde_cbor = "0.11.2" diff --git a/apps/labrinth/.sqlx/query-82b4d6e555dd727d31cca036b923611289b509ade9e1996d711598cd14c7f8fa.json b/apps/labrinth/.sqlx/query-82b4d6e555dd727d31cca036b923611289b509ade9e1996d711598cd14c7f8fa.json new file mode 100644 index 000000000..607f4aa33 --- /dev/null +++ b/apps/labrinth/.sqlx/query-82b4d6e555dd727d31cca036b923611289b509ade9e1996d711598cd14c7f8fa.json @@ -0,0 +1,37 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n WIDTH_BUCKET(\n EXTRACT(EPOCH FROM created)::bigint,\n EXTRACT(EPOCH FROM $1::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n EXTRACT(EPOCH FROM $2::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n $3::integer\n ) AS bucket,\n COALESCE(mod_id, 0) AS mod_id,\n SUM(amount) amount_sum\n FROM payouts_values\n WHERE\n user_id = $4\n AND created BETWEEN $1 AND $2\n GROUP BY bucket, mod_id", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "bucket", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "mod_id", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "amount_sum", + "type_info": "Numeric" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Timestamptz", + "Int4", + "Int8" + ] + }, + "nullable": [ + null, + null, + null + ] + }, + "hash": "82b4d6e555dd727d31cca036b923611289b509ade9e1996d711598cd14c7f8fa" +} diff --git a/apps/labrinth/Cargo.toml b/apps/labrinth/Cargo.toml index e8758e98b..0f780117f 100644 --- a/apps/labrinth/Cargo.toml +++ b/apps/labrinth/Cargo.toml @@ -62,6 +62,7 @@ bitflags.workspace = true hex.workspace = true zxcvbn.workspace = true totp-rs = { workspace = true, features = ["gen_secret"] } +const_format.workspace = true url.workspace = true urlencoding.workspace = true diff --git a/apps/labrinth/src/routes/v3/analytics_get.rs b/apps/labrinth/src/routes/v3/analytics_get.rs index 582a85ac5..ad21c1e59 100644 --- a/apps/labrinth/src/routes/v3/analytics_get.rs +++ b/apps/labrinth/src/routes/v3/analytics_get.rs @@ -1,673 +1,878 @@ -use super::ApiError; -use crate::database; -use crate::database::redis::RedisPool; -use crate::models::teams::ProjectPermissions; +//! # Design rationale +//! +//! - different metrics require different scopes +//! - views, downloads, playtime requires `Scopes::ANALYTICS` +//! - revenue requires `Scopes::PAYOUTS_READ` +//! - each request returns an array of N elements; if you have to make multiple +//! requests, you have to zip together M arrays of N elements +//! - this makes it inconvenient to have separate endpoints + +use std::num::NonZeroU64; + +use actix_web::{HttpRequest, web}; +use chrono::{DateTime, TimeDelta, Utc}; +use futures::StreamExt; +use rust_decimal::Decimal; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; + use crate::{ - auth::get_user_from_headers, - database::models::user_item, + auth::{AuthenticationError, get_user_from_headers}, + database::{ + self, DBProject, + models::{DBProjectId, DBUser, DBUserId, DBVersionId}, + redis::RedisPool, + }, models::{ ids::{ProjectId, VersionId}, pats::Scopes, + teams::ProjectPermissions, }, queue::session::AuthQueue, + routes::ApiError, }; -use actix_web::{HttpRequest, HttpResponse, web}; -use ariadne::ids::base62_impl::to_base62; -use chrono::{DateTime, Duration, Utc}; -use eyre::eyre; -use serde::{Deserialize, Serialize}; -use sqlx::PgPool; -use sqlx::postgres::types::PgInterval; -use std::collections::HashMap; -use std::convert::TryInto; -use std::num::NonZeroU32; +// TODO: this service `analytics` is shadowed by `analytics_get_old`'s +// see the TODO in `analytics_get_old.rs` pub fn config(cfg: &mut web::ServiceConfig) { - cfg.service( - web::scope("analytics") - .route("playtime", web::get().to(playtimes_get)) - .route("views", web::get().to(views_get)) - .route("downloads", web::get().to(downloads_get)) - .route("revenue", web::get().to(revenue_get)) - .route( - "countries/downloads", - web::get().to(countries_downloads_get), - ) - .route("countries/views", web::get().to(countries_views_get)), - ); + cfg.service(web::scope("analytics").route("", web::post().to(get))); } -/// The json data to be passed to fetch analytic data -/// Either a list of project_ids or version_ids can be used, but not both. Unauthorized projects/versions will be filtered out. -/// start_date and end_date are optional, and default to two weeks ago, and the maximum date respectively. -/// resolution_minutes is optional. This refers to the window by which we are looking (every day, every minute, etc) and defaults to 1440 (1 day) -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct GetData { - // only one of project_ids or version_ids should be used - // if neither are provided, all projects the user has access to will be used - pub project_ids: Option, +// request - pub start_date: Option>, // defaults to 2 weeks ago - pub end_date: Option>, // defaults to now - - pub resolution_minutes: Option, // defaults to 1 day. Ignored in routes that do not aggregate over a resolution (eg: /countries) +/// Requests analytics data, aggregating over all possible analytics sources +/// like projects and affiliate codes, returning the data in a list of time +/// slices. +#[derive(Debug, Serialize, Deserialize)] +pub struct GetRequest { + /// What time range to return statistics for. + pub time_range: TimeRange, + /// What analytics metrics to return data for. + pub return_metrics: ReturnMetrics, } -/// Get playtime data for a set of projects or versions -/// Data is returned as a hashmap of project/version ids to a hashmap of days to playtime data -/// eg: -/// { -/// "4N1tEhnO": { -/// "20230824": 23 -/// } -///} -/// Either a list of project_ids or version_ids can be used, but not both. Unauthorized projects/versions will be filtered out. -#[derive(Serialize, Deserialize, Clone)] -pub struct FetchedPlaytime { - pub time: u64, - pub total_seconds: u64, - pub loader_seconds: HashMap, - pub game_version_seconds: HashMap, - pub parent_seconds: HashMap, +/// Time range for fetching analytics. +#[derive(Debug, Serialize, Deserialize)] +pub struct TimeRange { + /// When to start including data. + pub start: DateTime, + /// When to stop including data. + pub end: DateTime, + /// Determines how many time slices between the start and end will be + /// included, and how fine-grained those time slices will be. + /// + /// This must fall within the bounds of [`MIN_RESOLUTION`] and + /// [`MAX_TIME_SLICES`]. + pub resolution: TimeRangeResolution, } -pub async fn playtimes_get( - req: HttpRequest, - clickhouse: web::Data, - data: web::Query, - session_queue: web::Data, - pool: web::Data, - redis: web::Data, -) -> Result { - let user = get_user_from_headers( - &req, - &**pool, - &redis, - &session_queue, - Scopes::ANALYTICS, - ) - .await - .map(|x| x.1)?; - let project_ids = data - .project_ids - .as_ref() - .map(|ids| serde_json::from_str::>(ids)) - .transpose()?; +/// Determines how many time slices between the start and end will be +/// included, and how fine-grained those time slices will be. +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum TimeRangeResolution { + /// Use a set number of time slices, with the resolution being determined + /// automatically. + Slices(NonZeroU64), + /// Each time slice will be a set number of minutes long, and the number of + /// slices is determined automatically. + Minutes(NonZeroU64), +} - let start_date = data.start_date.unwrap_or(Utc::now() - Duration::weeks(2)); - let end_date = data.end_date.unwrap_or(Utc::now()); - let resolution_minutes = data - .resolution_minutes - .map_or(60 * 24, |minutes| minutes.get()); +/// What metrics the caller would like to receive from this analytics get +/// request. +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct ReturnMetrics { + /// How many times a project page has been viewed. + pub project_views: Option>, + /// How many times a project has been downloaded. + pub project_downloads: Option>, + /// How long users have been playing a project. + pub project_playtime: Option>, + /// How much payout revenue a project has generated. + pub project_revenue: Option>, +} - // Convert String list to list of ProjectIds or VersionIds - // - Filter out unauthorized projects/versions - // - If no project_ids or version_ids are provided, we default to all projects the user has access to - let project_ids = - filter_allowed_ids(project_ids, user, &pool, &redis, None).await?; +/// See [`ReturnMetrics`]. +#[derive(Debug, Serialize, Deserialize)] +pub struct Metrics { + /// When collecting metrics, what fields do we want to group the results by? + /// + /// For example, if we have two views entries: + /// - `{ "project_id": "abcdefgh", "domain": "youtube.com", "count": 5 }` + /// - `{ "project_id": "abcdefgh", "domain": "discord.com", "count": 3 }` + /// + /// If we bucket by `domain`, then we will get two results: + /// - `{ "project_id": "abcdefgh", "domain": "youtube.com", "count": 5 }` + /// - `{ "project_id": "abcdefgh", "domain": "discord.com", "count": 3 }` + /// + /// If we do not bucket by `domain`, we will only get one, which is an + /// aggregate of the two rows: + /// - `{ "project_id": "abcdefgh", "count": 8 }` + #[serde(default = "Vec::default")] + pub bucket_by: Vec, +} - // Get the views - let playtimes = crate::clickhouse::fetch_playtimes( - project_ids.unwrap_or_default(), - start_date, - end_date, - resolution_minutes, - clickhouse.into_inner(), - ) - .await?; +/// Fields for [`ReturnMetrics::project_views`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ProjectViewsField { + /// Project ID. + ProjectId, + /// Referrer domain which linked to this project. + Domain, + /// Modrinth site path which was visited, e.g. `/mod/foo`. + SitePath, + /// Whether these views were monetized or not. + Monetized, + /// What country these views came from. + /// + /// To anonymize the data, the country may be reported as `XX`. + Country, +} - let mut hm = HashMap::new(); - for playtime in playtimes { - let id_string = to_base62(playtime.id); - if !hm.contains_key(&id_string) { - hm.insert(id_string.clone(), HashMap::new()); - } - if let Some(hm) = hm.get_mut(&id_string) { - hm.insert(playtime.time, playtime.total); - } +/// Fields for [`ReturnMetrics::project_downloads`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ProjectDownloadsField { + /// Project ID. + ProjectId, + /// Version ID of this project. + VersionId, + /// Referrer domain which linked to this project. + Domain, + /// Modrinth site path which was visited, e.g. `/mod/foo`. + SitePath, + /// What country these views came from. + /// + /// To anonymize the data, the country may be reported as `XX`. + Country, +} + +/// Fields for [`ReturnMetrics::project_playtime`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ProjectPlaytimeField { + /// Project ID. + ProjectId, + /// Version ID of this project. + VersionId, + /// Game mod loader which was used to count this playtime, e.g. Fabric. + Loader, + /// Game version which this project was played on. + GameVersion, +} + +/// Minimum width of a [`TimeSlice`], controlled by [`TimeRange::resolution`]. +pub const MIN_RESOLUTION: TimeDelta = TimeDelta::minutes(60); + +/// Maximum number of [`TimeSlice`]s in a [`GetResponse`], controlled by +/// [`TimeRange::resolution`]. +pub const MAX_TIME_SLICES: usize = 1024; + +// response + +/// Response for a [`GetRequest`]. +/// +/// This is a list of N [`TimeSlice`]s, where each slice represents an equal +/// time interval of metrics collection. The number of slices is determined +/// by [`GetRequest::time_range`]. +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct GetResponse(pub Vec); + +/// Single time interval of metrics collection. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct TimeSlice(pub Vec); + +/// Metrics collected in a single [`TimeSlice`]. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(untagged)] // the presence of `source_project`, `source_affiliate_code` determines the kind +pub enum AnalyticsData { + /// Project metrics. + Project(ProjectAnalytics), + // AffiliateCode(AffiliateCodeAnalytics), +} + +/// Project metrics. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProjectAnalytics { + /// What project these metrics are for. + source_project: ProjectId, + /// Metrics collected. + #[serde(flatten)] + metrics: ProjectMetrics, +} + +impl ProjectAnalytics { + /// Get the project ID for these analytics. + pub fn project_id(&self) -> &ProjectId { + &self.source_project + } +} + +/// Project metrics of a specific kind. +/// +/// If a field is not included in [`Metrics::bucket_by`], it will be [`None`]. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case", tag = "metric_kind")] +pub enum ProjectMetrics { + /// [`ReturnMetrics::project_views`]. + Views(ProjectViews), + /// [`ReturnMetrics::project_downloads`]. + Downloads(ProjectDownloads), + /// [`ReturnMetrics::project_playtime`]. + Playtime(ProjectPlaytime), + /// [`ReturnMetrics::project_revenue`]. + Revenue(ProjectRevenue), +} + +/// [`ReturnMetrics::project_views`]. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct ProjectViews { + /// [`ProjectViewsField::Domain`]. + #[serde(skip_serializing_if = "Option::is_none")] + pub domain: Option, + /// [`ProjectViewsField::SitePath`]. + #[serde(skip_serializing_if = "Option::is_none")] + pub site_path: Option, + /// [`ProjectViewsField::Monetized`]. + #[serde(skip_serializing_if = "Option::is_none")] + pub monetized: Option, + /// [`ProjectViewsField::Country`]. + #[serde(skip_serializing_if = "Option::is_none")] + pub country: Option, + /// Total number of views for this bucket. + pub views: u64, +} + +/// [`ReturnMetrics::project_downloads`]. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct ProjectDownloads { + /// [`ProjectDownloadsField::Domain`]. + #[serde(skip_serializing_if = "Option::is_none")] + domain: Option, + /// [`ProjectDownloadsField::SitePath`]. + #[serde(skip_serializing_if = "Option::is_none")] + site_path: Option, + /// [`ProjectDownloadsField::VersionId`]. + #[serde(skip_serializing_if = "Option::is_none")] + version_id: Option, + /// [`ProjectDownloadsField::Country`]. + #[serde(skip_serializing_if = "Option::is_none")] + country: Option, + /// Total number of downloads for this bucket. + downloads: u64, +} + +/// [`ReturnMetrics::project_playtime`]. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct ProjectPlaytime { + /// [`ProjectPlaytimeField::VersionId`]. + #[serde(skip_serializing_if = "Option::is_none")] + version_id: Option, + /// [`ProjectPlaytimeField::Loader`]. + #[serde(skip_serializing_if = "Option::is_none")] + loader: Option, + /// [`ProjectPlaytimeField::GameVersion`]. + #[serde(skip_serializing_if = "Option::is_none")] + game_version: Option, + /// Total number of seconds of playtime for this bucket. + seconds: u64, +} + +/// [`ReturnMetrics::project_revenue`]. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct ProjectRevenue { + /// Total revenue for this bucket. + revenue: Decimal, +} + +// logic + +/// Clickhouse queries - separate from [`sqlx`] queries. +mod query { + use crate::database::models::{DBProjectId, DBVersionId}; + use const_format::formatcp; + + const TIME_RANGE_START: &str = "{time_range_start: UInt64}"; + const TIME_RANGE_END: &str = "{time_range_end: UInt64}"; + const TIME_SLICES: &str = "{time_slices: UInt64}"; + const PROJECT_IDS: &str = "{project_ids: Array(UInt64)}"; + + #[derive(Debug, clickhouse::Row, serde::Deserialize)] + pub struct ViewRow { + pub bucket: u64, + pub project_id: DBProjectId, + pub domain: String, + pub site_path: String, + pub monetized: i8, + pub country: String, + pub views: u64, } - Ok(HttpResponse::Ok().json(hm)) -} + pub const VIEWS: &str = { + const USE_PROJECT_ID: &str = "{use_project_id: Bool}"; + const USE_DOMAIN: &str = "{use_domain: Bool}"; + const USE_SITE_PATH: &str = "{use_site_path: Bool}"; + const USE_MONETIZED: &str = "{use_monetized: Bool}"; + const USE_COUNTRY: &str = "{use_country: Bool}"; -/// Get view data for a set of projects or versions -/// Data is returned as a hashmap of project/version ids to a hashmap of days to views -/// eg: -/// { -/// "4N1tEhnO": { -/// "20230824": 1090 -/// } -///} -/// Either a list of project_ids or version_ids can be used, but not both. Unauthorized projects/versions will be filtered out. -pub async fn views_get( - req: HttpRequest, - clickhouse: web::Data, - data: web::Query, - session_queue: web::Data, - pool: web::Data, - redis: web::Data, -) -> Result { - let user = get_user_from_headers( - &req, - &**pool, - &redis, - &session_queue, - Scopes::ANALYTICS, - ) - .await - .map(|x| x.1)?; - - let project_ids = data - .project_ids - .as_ref() - .map(|ids| serde_json::from_str::>(ids)) - .transpose()?; - - let start_date = data.start_date.unwrap_or(Utc::now() - Duration::weeks(2)); - let end_date = data.end_date.unwrap_or(Utc::now()); - let resolution_minutes = data - .resolution_minutes - .map_or(60 * 24, |minutes| minutes.get()); - - // Convert String list to list of ProjectIds or VersionIds - // - Filter out unauthorized projects/versions - // - If no project_ids or version_ids are provided, we default to all projects the user has access to - let project_ids = - filter_allowed_ids(project_ids, user, &pool, &redis, None).await?; - - // Get the views - let views = crate::clickhouse::fetch_views( - project_ids.unwrap_or_default(), - start_date, - end_date, - resolution_minutes, - clickhouse.into_inner(), - ) - .await?; - - let mut hm = HashMap::new(); - for views in views { - let id_string = to_base62(views.id); - if !hm.contains_key(&id_string) { - hm.insert(id_string.clone(), HashMap::new()); - } - if let Some(hm) = hm.get_mut(&id_string) { - hm.insert(views.time, views.total); - } - } - - Ok(HttpResponse::Ok().json(hm)) -} - -/// Get download data for a set of projects or versions -/// Data is returned as a hashmap of project/version ids to a hashmap of days to downloads -/// eg: -/// { -/// "4N1tEhnO": { -/// "20230824": 32 -/// } -///} -/// Either a list of project_ids or version_ids can be used, but not both. Unauthorized projects/versions will be filtered out. -pub async fn downloads_get( - req: HttpRequest, - clickhouse: web::Data, - data: web::Query, - session_queue: web::Data, - pool: web::Data, - redis: web::Data, -) -> Result { - let user_option = get_user_from_headers( - &req, - &**pool, - &redis, - &session_queue, - Scopes::ANALYTICS, - ) - .await - .map(|x| x.1)?; - - let project_ids = data - .project_ids - .as_ref() - .map(|ids| serde_json::from_str::>(ids)) - .transpose()?; - - let start_date = data.start_date.unwrap_or(Utc::now() - Duration::weeks(2)); - let end_date = data.end_date.unwrap_or(Utc::now()); - let resolution_minutes = data - .resolution_minutes - .map_or(60 * 24, |minutes| minutes.get()); - - // Convert String list to list of ProjectIds or VersionIds - // - Filter out unauthorized projects/versions - // - If no project_ids or version_ids are provided, we default to all projects the user has access to - let project_ids = - filter_allowed_ids(project_ids, user_option, &pool, &redis, None) - .await?; - - // Get the downloads - let downloads = crate::clickhouse::fetch_downloads( - project_ids.unwrap_or_default(), - start_date, - end_date, - resolution_minutes, - clickhouse.into_inner(), - ) - .await?; - - let mut hm = HashMap::new(); - for downloads in downloads { - let id_string = to_base62(downloads.id); - if !hm.contains_key(&id_string) { - hm.insert(id_string.clone(), HashMap::new()); - } - if let Some(hm) = hm.get_mut(&id_string) { - hm.insert(downloads.time, downloads.total); - } - } - - Ok(HttpResponse::Ok().json(hm)) -} - -/// Get payout data for a set of projects -/// Data is returned as a hashmap of project ids to a hashmap of days to amount earned per day -/// eg: -/// { -/// "4N1tEhnO": { -/// "20230824": 0.001 -/// } -///} -/// ONLY project IDs can be used. Unauthorized projects will be filtered out. -pub async fn revenue_get( - req: HttpRequest, - data: web::Query, - session_queue: web::Data, - pool: web::Data, - redis: web::Data, -) -> Result { - let user = get_user_from_headers( - &req, - &**pool, - &redis, - &session_queue, - Scopes::PAYOUTS_READ, - ) - .await - .map(|x| x.1)?; - - let project_ids = data - .project_ids - .as_ref() - .map(|ids| serde_json::from_str::>(ids)) - .transpose()?; - - let start_date = data.start_date.unwrap_or(Utc::now() - Duration::weeks(2)); - let end_date = data.end_date.unwrap_or(Utc::now()); - let resolution_minutes = data - .resolution_minutes - .map_or(60 * 24, |minutes| minutes.get()); - - // Round up/down to nearest duration as we are using pgadmin, does not have rounding in the fetch command - // Round start_date down to nearest resolution - let diff = start_date.timestamp() % (resolution_minutes as i64 * 60); - let start_date = start_date - Duration::seconds(diff); - - // Round end_date up to nearest resolution - let diff = end_date.timestamp() % (resolution_minutes as i64 * 60); - let end_date = - end_date + Duration::seconds((resolution_minutes as i64 * 60) - diff); - - // Convert String list to list of ProjectIds or VersionIds - // - Filter out unauthorized projects/versions - // - If no project_ids or version_ids are provided, we default to all projects the user has access to - let project_ids = filter_allowed_ids( - project_ids, - user.clone(), - &pool, - &redis, - Some(true), - ) - .await?; - - let duration: PgInterval = Duration::minutes(resolution_minutes as i64) - .try_into() - .map_err(|_| { - ApiError::Request(eyre!("Invalid `resolution_minutes`")) - })?; - // Get the revenue data - let project_ids = project_ids.unwrap_or_default(); - - struct PayoutValue { - mod_id: Option, - amount_sum: Option, - interval_start: Option>, - } - - let payouts_values = if project_ids.is_empty() { - sqlx::query!( - " - SELECT mod_id, SUM(amount) amount_sum, DATE_BIN($4::interval, created, TIMESTAMP '2001-01-01') AS interval_start - FROM payouts_values - WHERE user_id = $1 AND created BETWEEN $2 AND $3 - GROUP by mod_id, interval_start ORDER BY interval_start - ", - user.id.0 as i64, - start_date, - end_date, - duration, + formatcp!( + "SELECT + widthBucket(toUnixTimestamp(recorded), {TIME_RANGE_START}, {TIME_RANGE_END}, {TIME_SLICES}) AS bucket, + if({USE_PROJECT_ID}, project_id, 0) AS project_id, + if({USE_DOMAIN}, domain, '') AS domain, + if({USE_SITE_PATH}, site_path, '') AS site_path, + if({USE_MONETIZED}, CAST(monetized AS Int8), -1) AS monetized, + if({USE_COUNTRY}, country, '') AS country, + COUNT(*) AS views + FROM views + WHERE + recorded BETWEEN {TIME_RANGE_START} AND {TIME_RANGE_END} + -- make sure that the REAL project id is included, + -- not the possibly-zero one, + -- by using `views.project_id` instead of `project_id` + AND views.project_id IN {PROJECT_IDS} + GROUP BY + bucket, project_id, domain, site_path, monetized, country" ) - .fetch_all(&**pool) - .await?.into_iter().map(|x| PayoutValue { - mod_id: x.mod_id, - amount_sum: x.amount_sum, - interval_start: x.interval_start, - }).collect::>() - } else { - sqlx::query!( - " - SELECT mod_id, SUM(amount) amount_sum, DATE_BIN($4::interval, created, TIMESTAMP '2001-01-01') AS interval_start - FROM payouts_values - WHERE mod_id = ANY($1) AND created BETWEEN $2 AND $3 - GROUP by mod_id, interval_start ORDER BY interval_start - ", - &project_ids.iter().map(|x| x.0 as i64).collect::>(), - start_date, - end_date, - duration, - ) - .fetch_all(&**pool) - .await?.into_iter().map(|x| PayoutValue { - mod_id: x.mod_id, - amount_sum: x.amount_sum, - interval_start: x.interval_start, - }).collect::>() }; - let mut hm: HashMap<_, _> = project_ids - .into_iter() - .map(|x| (x.to_string(), HashMap::new())) - .collect::>(); - for value in payouts_values { - if let Some(mod_id) = value.mod_id - && let Some(amount) = value.amount_sum - && let Some(interval_start) = value.interval_start - { - let id_string = to_base62(mod_id as u64); - if !hm.contains_key(&id_string) { - hm.insert(id_string.clone(), HashMap::new()); - } - if let Some(hm) = hm.get_mut(&id_string) { - hm.insert(interval_start.timestamp(), amount); - } - } + #[derive(Debug, clickhouse::Row, serde::Deserialize)] + pub struct DownloadRow { + pub bucket: u64, + pub project_id: DBProjectId, + pub domain: String, + pub site_path: String, + pub version_id: DBVersionId, + pub country: String, + pub downloads: u64, } - Ok(HttpResponse::Ok().json(hm)) + pub const DOWNLOADS: &str = { + const USE_PROJECT_ID: &str = "{use_project_id: Bool}"; + const USE_DOMAIN: &str = "{use_domain: Bool}"; + const USE_SITE_PATH: &str = "{use_site_path: Bool}"; + const USE_VERSION_ID: &str = "{use_version_id: Bool}"; + const USE_COUNTRY: &str = "{use_country: Bool}"; + + formatcp!( + "SELECT + widthBucket(toUnixTimestamp(recorded), {TIME_RANGE_START}, {TIME_RANGE_END}, {TIME_SLICES}) AS bucket, + if({USE_PROJECT_ID}, project_id, 0) AS project_id, + if({USE_DOMAIN}, domain, '') AS domain, + if({USE_SITE_PATH}, site_path, '') AS site_path, + if({USE_VERSION_ID}, version_id, 0) AS version_id, + if({USE_COUNTRY}, country, '') AS country, + COUNT(*) AS downloads + FROM downloads + WHERE + recorded BETWEEN {TIME_RANGE_START} AND {TIME_RANGE_END} + -- make sure that the REAL project id is included, + -- not the possibly-zero one, + -- by using `downloads.project_id` instead of `project_id` + AND downloads.project_id IN {PROJECT_IDS} + GROUP BY + bucket, project_id, domain, site_path, version_id, country" + ) + }; + + #[derive(Debug, clickhouse::Row, serde::Deserialize)] + pub struct PlaytimeRow { + pub bucket: u64, + pub project_id: DBProjectId, + pub version_id: DBVersionId, + pub loader: String, + pub game_version: String, + pub seconds: u64, + } + + pub const PLAYTIME: &str = { + const USE_PROJECT_ID: &str = "{use_project_id: Bool}"; + const USE_VERSION_ID: &str = "{use_version_id: Bool}"; + const USE_LOADER: &str = "{use_loader: Bool}"; + const USE_GAME_VERSION: &str = "{use_game_version: Bool}"; + + formatcp!( + "SELECT + widthBucket(toUnixTimestamp(recorded), {TIME_RANGE_START}, {TIME_RANGE_END}, {TIME_SLICES}) AS bucket, + if({USE_PROJECT_ID}, project_id, 0) AS project_id, + if({USE_VERSION_ID}, version_id, 0) AS version_id, + if({USE_LOADER}, loader, '') AS loader, + if({USE_GAME_VERSION}, game_version, '') AS game_version, + SUM(seconds) AS seconds + FROM playtime + WHERE + recorded BETWEEN {TIME_RANGE_START} AND {TIME_RANGE_END} + -- make sure that the REAL project id is included, + -- not the possibly-zero one, + -- by using `playtime.project_id` instead of `project_id` + AND playtime.project_id IN {PROJECT_IDS} + GROUP BY + bucket, project_id, version_id, loader, game_version" + ) + }; } -/// Get country data for a set of projects or versions -/// Data is returned as a hashmap of project/version ids to a hashmap of coutnry to downloads. -/// Unknown countries are labeled "". -/// This is usuable to see significant performing countries per project -/// eg: -/// { -/// "4N1tEhnO": { -/// "CAN": 22 -/// } -///} -/// Either a list of project_ids or version_ids can be used, but not both. Unauthorized projects/versions will be filtered out. -/// For this endpoint, provided dates are a range to aggregate over, not specific days to fetch -pub async fn countries_downloads_get( - req: HttpRequest, - clickhouse: web::Data, - data: web::Query, - session_queue: web::Data, +pub async fn get( + http_req: HttpRequest, + req: web::Json, pool: web::Data, redis: web::Data, -) -> Result { - let user = get_user_from_headers( - &req, + session_queue: web::Data, + clickhouse: web::Data, +) -> Result, ApiError> { + let (scopes, user) = get_user_from_headers( + &http_req, &**pool, &redis, &session_queue, Scopes::ANALYTICS, ) - .await - .map(|x| x.1)?; - - let project_ids = data - .project_ids - .as_ref() - .map(|ids| serde_json::from_str::>(ids)) - .transpose()?; - - let start_date = data.start_date.unwrap_or(Utc::now() - Duration::weeks(2)); - let end_date = data.end_date.unwrap_or(Utc::now()); - - // Convert String list to list of ProjectIds or VersionIds - // - Filter out unauthorized projects/versions - // - If no project_ids or version_ids are provided, we default to all projects the user has access to - let project_ids = - filter_allowed_ids(project_ids, user, &pool, &redis, None).await?; - - // Get the countries - let countries = crate::clickhouse::fetch_countries_downloads( - project_ids.unwrap_or_default(), - start_date, - end_date, - clickhouse.into_inner(), - ) .await?; - let mut hm = HashMap::new(); - for views in countries { - let id_string = to_base62(views.id); - if !hm.contains_key(&id_string) { - hm.insert(id_string.clone(), HashMap::new()); - } - if let Some(hm) = hm.get_mut(&id_string) { - hm.insert(views.country, views.total); - } + let full_time_range = req.time_range.end - req.time_range.start; + if full_time_range < TimeDelta::zero() { + return Err(ApiError::InvalidInput( + "End date must be after start date".into(), + )); } - let hm: HashMap> = hm - .into_iter() - .map(|(key, value)| (key, condense_countries(value))) - .collect(); - - Ok(HttpResponse::Ok().json(hm)) -} - -/// Get country data for a set of projects or versions -/// Data is returned as a hashmap of project/version ids to a hashmap of coutnry to views. -/// Unknown countries are labeled "". -/// This is usuable to see significant performing countries per project -/// eg: -/// { -/// "4N1tEhnO": { -/// "CAN": 56165 -/// } -///} -/// Either a list of project_ids or version_ids can be used, but not both. Unauthorized projects/versions will be filtered out. -/// For this endpoint, provided dates are a range to aggregate over, not specific days to fetch -pub async fn countries_views_get( - req: HttpRequest, - clickhouse: web::Data, - data: web::Query, - session_queue: web::Data, - pool: web::Data, - redis: web::Data, -) -> Result { - let user = get_user_from_headers( - &req, - &**pool, - &redis, - &session_queue, - Scopes::ANALYTICS, - ) - .await - .map(|x| x.1)?; - - let project_ids = data - .project_ids - .as_ref() - .map(|ids| serde_json::from_str::>(ids)) - .transpose()?; - - let start_date = data.start_date.unwrap_or(Utc::now() - Duration::weeks(2)); - let end_date = data.end_date.unwrap_or(Utc::now()); - - // Convert String list to list of ProjectIds or VersionIds - // - Filter out unauthorized projects/versions - // - If no project_ids or version_ids are provided, we default to all projects the user has access to - let project_ids = - filter_allowed_ids(project_ids, user, &pool, &redis, None).await?; - - // Get the countries - let countries = crate::clickhouse::fetch_countries_views( - project_ids.unwrap_or_default(), - start_date, - end_date, - clickhouse.into_inner(), - ) - .await?; - - let mut hm = HashMap::new(); - for views in countries { - let id_string = to_base62(views.id); - if !hm.contains_key(&id_string) { - hm.insert(id_string.clone(), HashMap::new()); - } - if let Some(hm) = hm.get_mut(&id_string) { - hm.insert(views.country, views.total); - } - } - - let hm: HashMap> = hm - .into_iter() - .map(|(key, value)| (key, condense_countries(value))) - .collect(); - - Ok(HttpResponse::Ok().json(hm)) -} - -fn condense_countries(countries: HashMap) -> HashMap { - // Every country under '15' (view or downloads) should be condensed into 'XX' - let mut hm = HashMap::new(); - for (mut country, count) in countries { - if count < 50 { - country = "XX".to_string(); - } - if !hm.contains_key(&country) { - hm.insert(country.to_string(), 0); - } - if let Some(hm) = hm.get_mut(&country) { - *hm += count; - } - } - hm -} - -async fn filter_allowed_ids( - mut project_ids: Option>, - user: crate::models::users::User, - pool: &web::Data, - redis: &RedisPool, - remove_defaults: Option, -) -> Result>, ApiError> { - // If no project_ids or version_ids are provided, we default to all projects the user has *public* access to - if project_ids.is_none() && !remove_defaults.unwrap_or(false) { - project_ids = Some( - user_item::DBUser::get_projects(user.id.into(), &***pool, redis) - .await? - .into_iter() - .map(|x| ProjectId::from(x).to_string()) - .collect(), - ); - } - - // Convert String list to list of ProjectIds or VersionIds - // - Filter out unauthorized projects/versions - let project_ids = if let Some(project_strings) = project_ids { - let projects_data = database::models::DBProject::get_many( - &project_strings, - &***pool, - redis, - ) - .await?; - - let team_ids = projects_data - .iter() - .map(|x| x.inner.team_id) - .collect::>(); - let team_members = - database::models::DBTeamMember::get_from_team_full_many( - &team_ids, &***pool, redis, - ) - .await?; - - let organization_ids = projects_data - .iter() - .filter_map(|x| x.inner.organization_id) - .collect::>(); - let organizations = database::models::DBOrganization::get_many_ids( - &organization_ids, - &***pool, - redis, - ) - .await?; - - let organization_team_ids = organizations - .iter() - .map(|x| x.team_id) - .collect::>(); - let organization_team_members = - database::models::DBTeamMember::get_from_team_full_many( - &organization_team_ids, - &***pool, - redis, - ) - .await?; - - let ids = projects_data - .into_iter() - .filter(|project| { - let team_member = team_members.iter().find(|x| { - x.team_id == project.inner.team_id - && x.user_id == user.id.into() - }); - - let organization = project - .inner - .organization_id - .and_then(|oid| organizations.iter().find(|x| x.id == oid)); - - let organization_team_member = - if let Some(organization) = organization { - organization_team_members.iter().find(|x| { - x.team_id == organization.team_id - && x.user_id == user.id.into() - }) - } else { - None - }; - - let permissions = ProjectPermissions::get_permissions_by_role( - &user.role, - &team_member.cloned(), - &organization_team_member.cloned(), + let (num_time_slices, resolution) = match req.time_range.resolution { + TimeRangeResolution::Slices(slices) => { + let slices = i32::try_from(slices.get()).map_err(|_| { + ApiError::InvalidInput( + "Number of slices must fit into an `i32`".into(), ) - .unwrap_or_default(); + })?; + let resolution = full_time_range / slices; + (slices as usize, resolution) + } + TimeRangeResolution::Minutes(resolution_minutes) => { + let resolution_minutes = i64::try_from(resolution_minutes.get()) + .map_err(|_| { + ApiError::InvalidInput( + "Resolution must fit into a `i64`".into(), + ) + })?; + let resolution = TimeDelta::try_minutes(resolution_minutes) + .ok_or_else(|| { + ApiError::InvalidInput("Resolution overflow".into()) + })?; - permissions.contains(ProjectPermissions::VIEW_ANALYTICS) - }) - .map(|x| x.inner.id.into()) - .collect::>(); + let num_slices = + full_time_range.as_seconds_f64() / resolution.as_seconds_f64(); - Some(ids) - } else { - None + (num_slices as usize, resolution) + } }; - // Only one of project_ids or version_ids will be Some - Ok(project_ids) + + if num_time_slices > MAX_TIME_SLICES { + return Err(ApiError::InvalidInput(format!( + "Resolution is too fine or range is too large - maximum of {MAX_TIME_SLICES} time slices, was {num_time_slices}" + ))); + } + if resolution < MIN_RESOLUTION { + return Err(ApiError::InvalidInput(format!( + "Resolution must be at least {MIN_RESOLUTION}, was {resolution}", + ))); + } + + let mut time_slices = vec![TimeSlice::default(); num_time_slices]; + + // TODO fetch from req + let project_ids = + DBUser::get_projects(user.id.into(), &**pool, &redis).await?; + + let project_ids = + filter_allowed_project_ids(&project_ids, &user, &pool, &redis).await?; + + let mut query_clickhouse_cx = QueryClickhouseContext { + clickhouse: &clickhouse, + req: &req, + time_slices: &mut time_slices, + project_ids: &project_ids, + }; + + if let Some(metrics) = &req.return_metrics.project_views { + use ProjectViewsField as F; + let uses = |field| metrics.bucket_by.contains(&field); + + query_clickhouse::( + &mut query_clickhouse_cx, + query::VIEWS, + &[ + ("use_project_id", uses(F::ProjectId)), + ("use_domain", uses(F::Domain)), + ("use_site_path", uses(F::SitePath)), + ("use_monetized", uses(F::Monetized)), + ("use_country", uses(F::Country)), + ], + |row| row.bucket, + |row| { + let country = if uses(F::Country) { + Some(condense_country(row.country, row.views)) + } else { + None + }; + AnalyticsData::Project(ProjectAnalytics { + source_project: row.project_id.into(), + metrics: ProjectMetrics::Views(ProjectViews { + domain: none_if_empty(row.domain), + site_path: none_if_empty(row.site_path), + monetized: match row.monetized { + 0 => Some(false), + 1 => Some(true), + _ => None, + }, + country, + views: row.views, + }), + }) + }, + ) + .await?; + } + + if let Some(metrics) = &req.return_metrics.project_downloads { + use ProjectDownloadsField as F; + let uses = |field| metrics.bucket_by.contains(&field); + + query_clickhouse::( + &mut query_clickhouse_cx, + query::DOWNLOADS, + &[ + ("use_project_id", uses(F::ProjectId)), + ("use_domain", uses(F::Domain)), + ("use_site_path", uses(F::SitePath)), + ("use_version_id", uses(F::VersionId)), + ("use_country", uses(F::Country)), + ], + |row| row.bucket, + |row| { + let country = if uses(F::Country) { + Some(condense_country(row.country, row.downloads)) + } else { + None + }; + AnalyticsData::Project(ProjectAnalytics { + source_project: row.project_id.into(), + metrics: ProjectMetrics::Downloads(ProjectDownloads { + domain: none_if_empty(row.domain), + site_path: none_if_empty(row.site_path), + version_id: none_if_zero_version_id(row.version_id), + country, + downloads: row.downloads, + }), + }) + }, + ) + .await?; + } + + if let Some(metrics) = &req.return_metrics.project_playtime { + use ProjectPlaytimeField as F; + let uses = |field| metrics.bucket_by.contains(&field); + + query_clickhouse::( + &mut query_clickhouse_cx, + query::PLAYTIME, + &[ + ("use_project_id", uses(F::ProjectId)), + ("use_version_id", uses(F::VersionId)), + ("use_loader", uses(F::Loader)), + ("use_game_version", uses(F::GameVersion)), + ], + |row| row.bucket, + |row| { + AnalyticsData::Project(ProjectAnalytics { + source_project: row.project_id.into(), + metrics: ProjectMetrics::Playtime(ProjectPlaytime { + version_id: none_if_zero_version_id(row.version_id), + loader: none_if_empty(row.loader), + game_version: none_if_empty(row.game_version), + seconds: row.seconds, + }), + }) + }, + ) + .await?; + } + + if req.return_metrics.project_revenue.is_some() { + if !scopes.contains(Scopes::PAYOUTS_READ) { + return Err(AuthenticationError::InvalidCredentials.into()); + } + + let mut rows = sqlx::query!( + "SELECT + WIDTH_BUCKET( + EXTRACT(EPOCH FROM created)::bigint, + EXTRACT(EPOCH FROM $1::timestamp with time zone AT TIME ZONE 'UTC')::bigint, + EXTRACT(EPOCH FROM $2::timestamp with time zone AT TIME ZONE 'UTC')::bigint, + $3::integer + ) AS bucket, + COALESCE(mod_id, 0) AS mod_id, + SUM(amount) amount_sum + FROM payouts_values + WHERE + user_id = $4 + AND created BETWEEN $1 AND $2 + GROUP BY bucket, mod_id", + req.time_range.start, + req.time_range.end, + num_time_slices as i64, + DBUserId::from(user.id) as DBUserId, + ) + .fetch(&**pool); + while let Some(row) = rows.next().await.transpose()? { + let bucket = row.bucket.ok_or_else(|| { + ApiError::InvalidInput( + "bucket should be non-null - query bug!".into(), + ) + })?; + let bucket = usize::try_from(bucket).map_err(|_| { + ApiError::InvalidInput( + "bucket value {bucket} does not fit into `usize` - query bug!".into(), + ) + })?; + + if let Some(source_project) = + row.mod_id.map(DBProjectId).map(ProjectId::from) + && let Some(revenue) = row.amount_sum + { + add_to_time_slice( + &mut time_slices, + bucket, + AnalyticsData::Project(ProjectAnalytics { + source_project, + metrics: ProjectMetrics::Revenue(ProjectRevenue { + revenue, + }), + }), + )?; + } + } + } + + Ok(web::Json(GetResponse(time_slices))) +} + +fn none_if_empty(s: String) -> Option { + if s.is_empty() { None } else { Some(s) } +} + +fn none_if_zero_version_id(v: DBVersionId) -> Option { + if v.0 == 0 { None } else { Some(v.into()) } +} + +fn condense_country(country: String, count: u64) -> String { + // Every country under '50' (view or downloads) should be condensed into 'XX' + if count < 50 { + "XX".to_string() + } else { + country + } +} + +struct QueryClickhouseContext<'a> { + clickhouse: &'a clickhouse::Client, + req: &'a GetRequest, + time_slices: &'a mut [TimeSlice], + project_ids: &'a [DBProjectId], +} + +async fn query_clickhouse( + cx: &mut QueryClickhouseContext<'_>, + query: &str, + use_columns: &[(&str, bool)], + row_get_bucket: impl Fn(&Row) -> u64, + row_to_analytics: impl Fn(Row) -> AnalyticsData, +) -> Result<(), ApiError> +where + Row: clickhouse::Row + serde::de::DeserializeOwned + std::fmt::Debug, +{ + let mut query = cx + .clickhouse + .query(query) + .param("time_range_start", cx.req.time_range.start.timestamp()) + .param("time_range_end", cx.req.time_range.end.timestamp()) + .param("time_slices", cx.time_slices.len()) + .param("project_ids", cx.project_ids); + for (param_name, used) in use_columns { + query = query.param(param_name, used) + } + let mut cursor = query.fetch::()?; + + while let Some(row) = cursor.next().await? { + let bucket = row_get_bucket(&row) as usize; + add_to_time_slice(cx.time_slices, bucket, row_to_analytics(row))?; + } + + Ok(()) +} + +fn add_to_time_slice( + time_slices: &mut [TimeSlice], + bucket: usize, + data: AnalyticsData, +) -> Result<(), ApiError> { + // row.recorded < time_range_start => bucket = 0 + // row.recorded >= time_range_end => bucket = num_time_slices + // (note: this is out of range of `time_slices`!) + let Some(bucket) = bucket.checked_sub(1) else { + return Ok(()); + }; + + let num_time_slices = time_slices.len(); + let slice = time_slices.get_mut(bucket).ok_or_else(|| { + ApiError::InvalidInput( + format!("bucket {bucket} returned by query out of range for {num_time_slices} - query bug!") + ) + })?; + + slice.0.push(data); + Ok(()) +} + +async fn filter_allowed_project_ids( + project_ids: &[DBProjectId], + user: &crate::models::users::User, + pool: &PgPool, + redis: &RedisPool, +) -> Result, ApiError> { + let projects = DBProject::get_many_ids(project_ids, pool, redis).await?; + + let team_ids = projects + .iter() + .map(|x| x.inner.team_id) + .collect::>(); + let team_members = database::models::DBTeamMember::get_from_team_full_many( + &team_ids, pool, redis, + ) + .await?; + + let organization_ids = projects + .iter() + .filter_map(|x| x.inner.organization_id) + .collect::>(); + let organizations = database::models::DBOrganization::get_many_ids( + &organization_ids, + pool, + redis, + ) + .await?; + + let organization_team_ids = organizations + .iter() + .map(|x| x.team_id) + .collect::>(); + let organization_team_members = + database::models::DBTeamMember::get_from_team_full_many( + &organization_team_ids, + pool, + redis, + ) + .await?; + + Ok(projects + .into_iter() + .filter(|project| { + let team_member = team_members.iter().find(|x| { + x.team_id == project.inner.team_id + && x.user_id == user.id.into() + }); + + let organization = project + .inner + .organization_id + .and_then(|oid| organizations.iter().find(|x| x.id == oid)); + + let organization_team_member = + if let Some(organization) = organization { + organization_team_members.iter().find(|x| { + x.team_id == organization.team_id + && x.user_id == user.id.into() + }) + } else { + None + }; + + let permissions = ProjectPermissions::get_permissions_by_role( + &user.role, + &team_member.cloned(), + &organization_team_member.cloned(), + ) + .unwrap_or_default(); + + permissions.contains(ProjectPermissions::VIEW_ANALYTICS) + }) + .map(|project| project.inner.id) + .collect::>()) +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::*; + + #[test] + fn response_format() { + let test_project_1 = ProjectId(123); + let test_project_2 = ProjectId(456); + let test_project_3 = ProjectId(789); + + let src = GetResponse(vec![ + TimeSlice(vec![ + AnalyticsData::Project(ProjectAnalytics { + source_project: test_project_1, + metrics: ProjectMetrics::Views(ProjectViews { + domain: Some("youtube.com".into()), + views: 100, + ..Default::default() + }), + }), + AnalyticsData::Project(ProjectAnalytics { + source_project: test_project_2, + metrics: ProjectMetrics::Downloads(ProjectDownloads { + domain: Some("discord.com".into()), + downloads: 150, + ..Default::default() + }), + }), + ]), + TimeSlice(vec![AnalyticsData::Project(ProjectAnalytics { + source_project: test_project_3, + metrics: ProjectMetrics::Revenue(ProjectRevenue { + revenue: Decimal::new(20000, 2), + }), + })]), + ]); + let target = json!([ + [ + { + "source_project": test_project_1.to_string(), + "metric_kind": "views", + "domain": "youtube.com", + "views": 100, + }, + { + "source_project": test_project_2.to_string(), + "metric_kind": "downloads", + "domain": "discord.com", + "downloads": 150, + } + ], + [ + { + "source_project": test_project_3.to_string(), + "metric_kind": "revenue", + "revenue": "200.00", + } + ] + ]); + + assert_eq!(serde_json::to_value(src).unwrap(), target); + } } diff --git a/apps/labrinth/src/routes/v3/analytics_get_old.rs b/apps/labrinth/src/routes/v3/analytics_get_old.rs new file mode 100644 index 000000000..f8d64e7b4 --- /dev/null +++ b/apps/labrinth/src/routes/v3/analytics_get_old.rs @@ -0,0 +1,677 @@ +//! TODO: this module should be removed; it is superceded by `analytics_get` + +use super::ApiError; +use crate::database; +use crate::database::redis::RedisPool; +use crate::models::teams::ProjectPermissions; +use crate::{ + auth::get_user_from_headers, + database::models::user_item, + models::{ + ids::{ProjectId, VersionId}, + pats::Scopes, + }, + queue::session::AuthQueue, +}; +use actix_web::{HttpRequest, HttpResponse, web}; +use ariadne::ids::base62_impl::to_base62; +use chrono::{DateTime, Duration, Utc}; +use eyre::eyre; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; +use sqlx::postgres::types::PgInterval; +use std::collections::HashMap; +use std::convert::TryInto; +use std::num::NonZeroU32; + +pub fn config(cfg: &mut web::ServiceConfig) { + cfg.service( + web::scope("analytics") + // TODO: since our service shadows analytics v2, we have to redirect here + .route("", web::post().to(super::analytics_get::get)) + .route("playtime", web::get().to(playtimes_get)) + .route("views", web::get().to(views_get)) + .route("downloads", web::get().to(downloads_get)) + .route("revenue", web::get().to(revenue_get)) + .route( + "countries/downloads", + web::get().to(countries_downloads_get), + ) + .route("countries/views", web::get().to(countries_views_get)), + ); +} + +/// The json data to be passed to fetch analytic data +/// Either a list of project_ids or version_ids can be used, but not both. Unauthorized projects/versions will be filtered out. +/// start_date and end_date are optional, and default to two weeks ago, and the maximum date respectively. +/// resolution_minutes is optional. This refers to the window by which we are looking (every day, every minute, etc) and defaults to 1440 (1 day) +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct GetData { + // only one of project_ids or version_ids should be used + // if neither are provided, all projects the user has access to will be used + pub project_ids: Option, + + pub start_date: Option>, // defaults to 2 weeks ago + pub end_date: Option>, // defaults to now + + pub resolution_minutes: Option, // defaults to 1 day. Ignored in routes that do not aggregate over a resolution (eg: /countries) +} + +/// Get playtime data for a set of projects or versions +/// Data is returned as a hashmap of project/version ids to a hashmap of days to playtime data +/// eg: +/// { +/// "4N1tEhnO": { +/// "20230824": 23 +/// } +///} +/// Either a list of project_ids or version_ids can be used, but not both. Unauthorized projects/versions will be filtered out. +#[derive(Serialize, Deserialize, Clone)] +pub struct FetchedPlaytime { + pub time: u64, + pub total_seconds: u64, + pub loader_seconds: HashMap, + pub game_version_seconds: HashMap, + pub parent_seconds: HashMap, +} +pub async fn playtimes_get( + req: HttpRequest, + clickhouse: web::Data, + data: web::Query, + session_queue: web::Data, + pool: web::Data, + redis: web::Data, +) -> Result { + let user = get_user_from_headers( + &req, + &**pool, + &redis, + &session_queue, + Scopes::ANALYTICS, + ) + .await + .map(|x| x.1)?; + + let project_ids = data + .project_ids + .as_ref() + .map(|ids| serde_json::from_str::>(ids)) + .transpose()?; + + let start_date = data.start_date.unwrap_or(Utc::now() - Duration::weeks(2)); + let end_date = data.end_date.unwrap_or(Utc::now()); + let resolution_minutes = data + .resolution_minutes + .map_or(60 * 24, |minutes| minutes.get()); + + // Convert String list to list of ProjectIds or VersionIds + // - Filter out unauthorized projects/versions + // - If no project_ids or version_ids are provided, we default to all projects the user has access to + let project_ids = + filter_allowed_ids(project_ids, user, &pool, &redis, None).await?; + + // Get the views + let playtimes = crate::clickhouse::fetch_playtimes( + project_ids.unwrap_or_default(), + start_date, + end_date, + resolution_minutes, + clickhouse.into_inner(), + ) + .await?; + + let mut hm = HashMap::new(); + for playtime in playtimes { + let id_string = to_base62(playtime.id); + if !hm.contains_key(&id_string) { + hm.insert(id_string.clone(), HashMap::new()); + } + if let Some(hm) = hm.get_mut(&id_string) { + hm.insert(playtime.time, playtime.total); + } + } + + Ok(HttpResponse::Ok().json(hm)) +} + +/// Get view data for a set of projects or versions +/// Data is returned as a hashmap of project/version ids to a hashmap of days to views +/// eg: +/// { +/// "4N1tEhnO": { +/// "20230824": 1090 +/// } +///} +/// Either a list of project_ids or version_ids can be used, but not both. Unauthorized projects/versions will be filtered out. +pub async fn views_get( + req: HttpRequest, + clickhouse: web::Data, + data: web::Query, + session_queue: web::Data, + pool: web::Data, + redis: web::Data, +) -> Result { + let user = get_user_from_headers( + &req, + &**pool, + &redis, + &session_queue, + Scopes::ANALYTICS, + ) + .await + .map(|x| x.1)?; + + let project_ids = data + .project_ids + .as_ref() + .map(|ids| serde_json::from_str::>(ids)) + .transpose()?; + + let start_date = data.start_date.unwrap_or(Utc::now() - Duration::weeks(2)); + let end_date = data.end_date.unwrap_or(Utc::now()); + let resolution_minutes = data + .resolution_minutes + .map_or(60 * 24, |minutes| minutes.get()); + + // Convert String list to list of ProjectIds or VersionIds + // - Filter out unauthorized projects/versions + // - If no project_ids or version_ids are provided, we default to all projects the user has access to + let project_ids = + filter_allowed_ids(project_ids, user, &pool, &redis, None).await?; + + // Get the views + let views = crate::clickhouse::fetch_views( + project_ids.unwrap_or_default(), + start_date, + end_date, + resolution_minutes, + clickhouse.into_inner(), + ) + .await?; + + let mut hm = HashMap::new(); + for views in views { + let id_string = to_base62(views.id); + if !hm.contains_key(&id_string) { + hm.insert(id_string.clone(), HashMap::new()); + } + if let Some(hm) = hm.get_mut(&id_string) { + hm.insert(views.time, views.total); + } + } + + Ok(HttpResponse::Ok().json(hm)) +} + +/// Get download data for a set of projects or versions +/// Data is returned as a hashmap of project/version ids to a hashmap of days to downloads +/// eg: +/// { +/// "4N1tEhnO": { +/// "20230824": 32 +/// } +///} +/// Either a list of project_ids or version_ids can be used, but not both. Unauthorized projects/versions will be filtered out. +pub async fn downloads_get( + req: HttpRequest, + clickhouse: web::Data, + data: web::Query, + session_queue: web::Data, + pool: web::Data, + redis: web::Data, +) -> Result { + let user_option = get_user_from_headers( + &req, + &**pool, + &redis, + &session_queue, + Scopes::ANALYTICS, + ) + .await + .map(|x| x.1)?; + + let project_ids = data + .project_ids + .as_ref() + .map(|ids| serde_json::from_str::>(ids)) + .transpose()?; + + let start_date = data.start_date.unwrap_or(Utc::now() - Duration::weeks(2)); + let end_date = data.end_date.unwrap_or(Utc::now()); + let resolution_minutes = data + .resolution_minutes + .map_or(60 * 24, |minutes| minutes.get()); + + // Convert String list to list of ProjectIds or VersionIds + // - Filter out unauthorized projects/versions + // - If no project_ids or version_ids are provided, we default to all projects the user has access to + let project_ids = + filter_allowed_ids(project_ids, user_option, &pool, &redis, None) + .await?; + + // Get the downloads + let downloads = crate::clickhouse::fetch_downloads( + project_ids.unwrap_or_default(), + start_date, + end_date, + resolution_minutes, + clickhouse.into_inner(), + ) + .await?; + + let mut hm = HashMap::new(); + for downloads in downloads { + let id_string = to_base62(downloads.id); + if !hm.contains_key(&id_string) { + hm.insert(id_string.clone(), HashMap::new()); + } + if let Some(hm) = hm.get_mut(&id_string) { + hm.insert(downloads.time, downloads.total); + } + } + + Ok(HttpResponse::Ok().json(hm)) +} + +/// Get payout data for a set of projects +/// Data is returned as a hashmap of project ids to a hashmap of days to amount earned per day +/// eg: +/// { +/// "4N1tEhnO": { +/// "20230824": 0.001 +/// } +///} +/// ONLY project IDs can be used. Unauthorized projects will be filtered out. +pub async fn revenue_get( + req: HttpRequest, + data: web::Query, + session_queue: web::Data, + pool: web::Data, + redis: web::Data, +) -> Result { + let user = get_user_from_headers( + &req, + &**pool, + &redis, + &session_queue, + Scopes::PAYOUTS_READ, + ) + .await + .map(|x| x.1)?; + + let project_ids = data + .project_ids + .as_ref() + .map(|ids| serde_json::from_str::>(ids)) + .transpose()?; + + let start_date = data.start_date.unwrap_or(Utc::now() - Duration::weeks(2)); + let end_date = data.end_date.unwrap_or(Utc::now()); + let resolution_minutes = data + .resolution_minutes + .map_or(60 * 24, |minutes| minutes.get()); + + // Round up/down to nearest duration as we are using pgadmin, does not have rounding in the fetch command + // Round start_date down to nearest resolution + let diff = start_date.timestamp() % (resolution_minutes as i64 * 60); + let start_date = start_date - Duration::seconds(diff); + + // Round end_date up to nearest resolution + let diff = end_date.timestamp() % (resolution_minutes as i64 * 60); + let end_date = + end_date + Duration::seconds((resolution_minutes as i64 * 60) - diff); + + // Convert String list to list of ProjectIds or VersionIds + // - Filter out unauthorized projects/versions + // - If no project_ids or version_ids are provided, we default to all projects the user has access to + let project_ids = filter_allowed_ids( + project_ids, + user.clone(), + &pool, + &redis, + Some(true), + ) + .await?; + + let duration: PgInterval = Duration::minutes(resolution_minutes as i64) + .try_into() + .map_err(|_| { + ApiError::Request(eyre!("Invalid `resolution_minutes`")) + })?; + // Get the revenue data + let project_ids = project_ids.unwrap_or_default(); + + struct PayoutValue { + mod_id: Option, + amount_sum: Option, + interval_start: Option>, + } + + let payouts_values = if project_ids.is_empty() { + sqlx::query!( + " + SELECT mod_id, SUM(amount) amount_sum, DATE_BIN($4::interval, created, TIMESTAMP '2001-01-01') AS interval_start + FROM payouts_values + WHERE user_id = $1 AND created BETWEEN $2 AND $3 + GROUP by mod_id, interval_start ORDER BY interval_start + ", + user.id.0 as i64, + start_date, + end_date, + duration, + ) + .fetch_all(&**pool) + .await?.into_iter().map(|x| PayoutValue { + mod_id: x.mod_id, + amount_sum: x.amount_sum, + interval_start: x.interval_start, + }).collect::>() + } else { + sqlx::query!( + " + SELECT mod_id, SUM(amount) amount_sum, DATE_BIN($4::interval, created, TIMESTAMP '2001-01-01') AS interval_start + FROM payouts_values + WHERE mod_id = ANY($1) AND created BETWEEN $2 AND $3 + GROUP by mod_id, interval_start ORDER BY interval_start + ", + &project_ids.iter().map(|x| x.0 as i64).collect::>(), + start_date, + end_date, + duration, + ) + .fetch_all(&**pool) + .await?.into_iter().map(|x| PayoutValue { + mod_id: x.mod_id, + amount_sum: x.amount_sum, + interval_start: x.interval_start, + }).collect::>() + }; + + let mut hm: HashMap<_, _> = project_ids + .into_iter() + .map(|x| (x.to_string(), HashMap::new())) + .collect::>(); + for value in payouts_values { + if let Some(mod_id) = value.mod_id + && let Some(amount) = value.amount_sum + && let Some(interval_start) = value.interval_start + { + let id_string = to_base62(mod_id as u64); + if !hm.contains_key(&id_string) { + hm.insert(id_string.clone(), HashMap::new()); + } + if let Some(hm) = hm.get_mut(&id_string) { + hm.insert(interval_start.timestamp(), amount); + } + } + } + + Ok(HttpResponse::Ok().json(hm)) +} + +/// Get country data for a set of projects or versions +/// Data is returned as a hashmap of project/version ids to a hashmap of coutnry to downloads. +/// Unknown countries are labeled "". +/// This is usuable to see significant performing countries per project +/// eg: +/// { +/// "4N1tEhnO": { +/// "CAN": 22 +/// } +///} +/// Either a list of project_ids or version_ids can be used, but not both. Unauthorized projects/versions will be filtered out. +/// For this endpoint, provided dates are a range to aggregate over, not specific days to fetch +pub async fn countries_downloads_get( + req: HttpRequest, + clickhouse: web::Data, + data: web::Query, + session_queue: web::Data, + pool: web::Data, + redis: web::Data, +) -> Result { + let user = get_user_from_headers( + &req, + &**pool, + &redis, + &session_queue, + Scopes::ANALYTICS, + ) + .await + .map(|x| x.1)?; + + let project_ids = data + .project_ids + .as_ref() + .map(|ids| serde_json::from_str::>(ids)) + .transpose()?; + + let start_date = data.start_date.unwrap_or(Utc::now() - Duration::weeks(2)); + let end_date = data.end_date.unwrap_or(Utc::now()); + + // Convert String list to list of ProjectIds or VersionIds + // - Filter out unauthorized projects/versions + // - If no project_ids or version_ids are provided, we default to all projects the user has access to + let project_ids = + filter_allowed_ids(project_ids, user, &pool, &redis, None).await?; + + // Get the countries + let countries = crate::clickhouse::fetch_countries_downloads( + project_ids.unwrap_or_default(), + start_date, + end_date, + clickhouse.into_inner(), + ) + .await?; + + let mut hm = HashMap::new(); + for views in countries { + let id_string = to_base62(views.id); + if !hm.contains_key(&id_string) { + hm.insert(id_string.clone(), HashMap::new()); + } + if let Some(hm) = hm.get_mut(&id_string) { + hm.insert(views.country, views.total); + } + } + + let hm: HashMap> = hm + .into_iter() + .map(|(key, value)| (key, condense_countries(value))) + .collect(); + + Ok(HttpResponse::Ok().json(hm)) +} + +/// Get country data for a set of projects or versions +/// Data is returned as a hashmap of project/version ids to a hashmap of coutnry to views. +/// Unknown countries are labeled "". +/// This is usuable to see significant performing countries per project +/// eg: +/// { +/// "4N1tEhnO": { +/// "CAN": 56165 +/// } +///} +/// Either a list of project_ids or version_ids can be used, but not both. Unauthorized projects/versions will be filtered out. +/// For this endpoint, provided dates are a range to aggregate over, not specific days to fetch +pub async fn countries_views_get( + req: HttpRequest, + clickhouse: web::Data, + data: web::Query, + session_queue: web::Data, + pool: web::Data, + redis: web::Data, +) -> Result { + let user = get_user_from_headers( + &req, + &**pool, + &redis, + &session_queue, + Scopes::ANALYTICS, + ) + .await + .map(|x| x.1)?; + + let project_ids = data + .project_ids + .as_ref() + .map(|ids| serde_json::from_str::>(ids)) + .transpose()?; + + let start_date = data.start_date.unwrap_or(Utc::now() - Duration::weeks(2)); + let end_date = data.end_date.unwrap_or(Utc::now()); + + // Convert String list to list of ProjectIds or VersionIds + // - Filter out unauthorized projects/versions + // - If no project_ids or version_ids are provided, we default to all projects the user has access to + let project_ids = + filter_allowed_ids(project_ids, user, &pool, &redis, None).await?; + + // Get the countries + let countries = crate::clickhouse::fetch_countries_views( + project_ids.unwrap_or_default(), + start_date, + end_date, + clickhouse.into_inner(), + ) + .await?; + + let mut hm = HashMap::new(); + for views in countries { + let id_string = to_base62(views.id); + if !hm.contains_key(&id_string) { + hm.insert(id_string.clone(), HashMap::new()); + } + if let Some(hm) = hm.get_mut(&id_string) { + hm.insert(views.country, views.total); + } + } + + let hm: HashMap> = hm + .into_iter() + .map(|(key, value)| (key, condense_countries(value))) + .collect(); + + Ok(HttpResponse::Ok().json(hm)) +} + +fn condense_countries(countries: HashMap) -> HashMap { + // Every country under '15' (view or downloads) should be condensed into 'XX' + let mut hm = HashMap::new(); + for (mut country, count) in countries { + if count < 50 { + country = "XX".to_string(); + } + if !hm.contains_key(&country) { + hm.insert(country.to_string(), 0); + } + if let Some(hm) = hm.get_mut(&country) { + *hm += count; + } + } + hm +} + +async fn filter_allowed_ids( + mut project_ids: Option>, + user: crate::models::users::User, + pool: &web::Data, + redis: &RedisPool, + remove_defaults: Option, +) -> Result>, ApiError> { + // If no project_ids or version_ids are provided, we default to all projects the user has *public* access to + if project_ids.is_none() && !remove_defaults.unwrap_or(false) { + project_ids = Some( + user_item::DBUser::get_projects(user.id.into(), &***pool, redis) + .await? + .into_iter() + .map(|x| ProjectId::from(x).to_string()) + .collect(), + ); + } + + // Convert String list to list of ProjectIds or VersionIds + // - Filter out unauthorized projects/versions + let project_ids = if let Some(project_strings) = project_ids { + let projects_data = database::models::DBProject::get_many( + &project_strings, + &***pool, + redis, + ) + .await?; + + let team_ids = projects_data + .iter() + .map(|x| x.inner.team_id) + .collect::>(); + let team_members = + database::models::DBTeamMember::get_from_team_full_many( + &team_ids, &***pool, redis, + ) + .await?; + + let organization_ids = projects_data + .iter() + .filter_map(|x| x.inner.organization_id) + .collect::>(); + let organizations = database::models::DBOrganization::get_many_ids( + &organization_ids, + &***pool, + redis, + ) + .await?; + + let organization_team_ids = organizations + .iter() + .map(|x| x.team_id) + .collect::>(); + let organization_team_members = + database::models::DBTeamMember::get_from_team_full_many( + &organization_team_ids, + &***pool, + redis, + ) + .await?; + + let ids = projects_data + .into_iter() + .filter(|project| { + let team_member = team_members.iter().find(|x| { + x.team_id == project.inner.team_id + && x.user_id == user.id.into() + }); + + let organization = project + .inner + .organization_id + .and_then(|oid| organizations.iter().find(|x| x.id == oid)); + + let organization_team_member = + if let Some(organization) = organization { + organization_team_members.iter().find(|x| { + x.team_id == organization.team_id + && x.user_id == user.id.into() + }) + } else { + None + }; + + let permissions = ProjectPermissions::get_permissions_by_role( + &user.role, + &team_member.cloned(), + &organization_team_member.cloned(), + ) + .unwrap_or_default(); + + permissions.contains(ProjectPermissions::VIEW_ANALYTICS) + }) + .map(|x| x.inner.id.into()) + .collect::>(); + + Some(ids) + } else { + None + }; + // Only one of project_ids or version_ids will be Some + Ok(project_ids) +} diff --git a/apps/labrinth/src/routes/v3/mod.rs b/apps/labrinth/src/routes/v3/mod.rs index 25c4ed6ca..7a989d81d 100644 --- a/apps/labrinth/src/routes/v3/mod.rs +++ b/apps/labrinth/src/routes/v3/mod.rs @@ -4,6 +4,7 @@ use actix_web::{HttpResponse, web}; use serde_json::json; pub mod analytics_get; +pub mod analytics_get_old; pub mod collections; pub mod friends; pub mod images; @@ -32,7 +33,8 @@ pub fn config(cfg: &mut web::ServiceConfig) { web::scope("v3") .wrap(default_cors()) .configure(limits::config) - .configure(analytics_get::config) + // .configure(analytics_get::config) // TODO: see `analytics_get` + .configure(analytics_get_old::config) .configure(collections::config) .configure(images::config) .configure(notifications::config)