From 5727e156eda9dc310ade0539bef75ef5398dd2b0 Mon Sep 17 00:00:00 2001 From: aecsocket Date: Fri, 22 May 2026 19:32:33 +0100 Subject: [PATCH] Fetch project analytics events on analytics get (#6143) * Fetch project analytics events * fix * post-query ua bucketing * fmt --- ...db5e0c78f8f3896c1e9f2b2b56cce41fa1591.json | 36 +++ apps/labrinth/src/routes/v3/analytics_get.rs | 281 +++++++++++++++--- 2 files changed, 274 insertions(+), 43 deletions(-) create mode 100644 apps/labrinth/.sqlx/query-a5141c0435441f062231c842cb5db5e0c78f8f3896c1e9f2b2b56cce41fa1591.json diff --git a/apps/labrinth/.sqlx/query-a5141c0435441f062231c842cb5db5e0c78f8f3896c1e9f2b2b56cce41fa1591.json b/apps/labrinth/.sqlx/query-a5141c0435441f062231c842cb5db5e0c78f8f3896c1e9f2b2b56cce41fa1591.json new file mode 100644 index 000000000..2ad563e21 --- /dev/null +++ b/apps/labrinth/.sqlx/query-a5141c0435441f062231c842cb5db5e0c78f8f3896c1e9f2b2b56cce41fa1591.json @@ -0,0 +1,36 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n t.mod_id AS \"project_id!\",\n tm.created,\n tm.body AS \"body: sqlx::types::Json\"\n FROM threads_messages tm\n INNER JOIN threads t ON t.id = tm.thread_id\n WHERE\n t.mod_id = ANY($1)\n AND tm.body->>'type' = 'status_change'\n AND tm.created BETWEEN $2 AND $3\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "project_id!", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "created", + "type_info": "Timestamptz" + }, + { + "ordinal": 2, + "name": "body: sqlx::types::Json", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Int8Array", + "Timestamptz", + "Timestamptz" + ] + }, + "nullable": [ + true, + false, + false + ] + }, + "hash": "a5141c0435441f062231c842cb5db5e0c78f8f3896c1e9f2b2b56cce41fa1591" +} diff --git a/apps/labrinth/src/routes/v3/analytics_get.rs b/apps/labrinth/src/routes/v3/analytics_get.rs index 64a80418d..76998c9f8 100644 --- a/apps/labrinth/src/routes/v3/analytics_get.rs +++ b/apps/labrinth/src/routes/v3/analytics_get.rs @@ -9,7 +9,7 @@ mod old; -use std::{num::NonZeroU64, sync::LazyLock}; +use std::{collections::HashMap, num::NonZeroU64, sync::LazyLock}; use crate::database::PgPool; use actix_web::{HttpRequest, post, web}; @@ -21,19 +21,24 @@ use rust_decimal::Decimal; use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Error as _}; use crate::{ - auth::{AuthenticationError, get_user_from_headers}, + auth::{ + AuthenticationError, checks::filter_visible_version_ids, + get_user_from_headers, + }, database::{ self, DBProject, models::{ DBAffiliateCode, DBAffiliateCodeId, DBProjectId, DBUser, DBUserId, - DBVersionId, + DBVersion, DBVersionId, }, redis::RedisPool, }, models::{ ids::{AffiliateCodeId, ProjectId, VersionId}, pats::Scopes, + projects::ProjectStatus, teams::ProjectPermissions, + threads::MessageBody, v3::analytics::DownloadReason, }, queue::session::AuthQueue, @@ -255,17 +260,46 @@ pub const MAX_TIME_SLICES: usize = 1024; /// Response for a [`GetRequest`]. #[derive(Debug, Default, Serialize, Deserialize, utoipa::ToSchema)] -pub struct FetchResponse { +pub struct GetResponse { /// List of N [`TimeSlice`]s, where each slice represents an equal /// time interval of metrics collection. The number of slices is determined /// by [`GetRequest::time_range`]. pub metrics: Vec, + /// List of events associated with projects that were requested. + pub project_events: Vec, } /// Single time interval of metrics collection. #[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)] pub struct TimeSlice(pub Vec); +/// Notable update to a project which may reflect in analytics metrics. +#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] +pub struct ProjectAnalyticsEvent { + /// ID of the event's project. + pub project_id: ProjectId, + /// When the event occurred. + pub timestamp: DateTime, + #[serde(flatten)] + pub kind: ProjectAnalyticsEventKind, +} + +#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum ProjectAnalyticsEventKind { + /// New version of this project was uploaded. + VersionUploaded { + version_id: VersionId, + version_name: String, + version_number: String, + }, + /// Project changed status. + StatusChanged { + status_from: ProjectStatus, + status_to: ProjectStatus, + }, +} + /// Metrics collected in a single [`TimeSlice`]. #[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] #[serde(untagged)] // the presence of `source_project`, `source_affiliate_code` determines the kind @@ -351,7 +385,7 @@ pub struct ProjectDownloads { downloads: u64, } -#[derive(Debug, Clone, PartialEq, Eq, utoipa::ToSchema)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, utoipa::ToSchema)] pub enum DownloadSource { Website, ModrinthApp, @@ -665,7 +699,7 @@ mod query { /// Fetches analytics data for the authorized user's projects. #[utoipa::path( - responses((status = OK, body = inline(FetchResponse))), + responses((status = OK, body = inline(GetResponse))), )] #[post("")] pub async fn fetch_analytics( @@ -675,7 +709,7 @@ pub async fn fetch_analytics( redis: web::Data, session_queue: web::Data, clickhouse: web::Data, -) -> Result, ApiError> { +) -> Result, ApiError> { let (scopes, user) = get_user_from_headers( &http_req, &**pool, @@ -768,6 +802,44 @@ pub async fn fetch_analytics( .iter() .map(|version| DBProjectId(version.mod_id)) .collect::>(); + let parent_version_data = + DBVersion::get_many(&parent_version_ids, &**pool, &redis).await?; + let visible_version_ids = filter_visible_version_ids( + parent_version_data + .iter() + .map(|version| &version.inner) + .collect(), + &Some(user.clone()), + &pool, + &redis, + ) + .await?; + let mut project_events = parent_version_data + .iter() + .filter(|version| { + visible_version_ids.contains(&version.inner.id) + && version.inner.date_published >= req.time_range.start + && version.inner.date_published <= req.time_range.end + }) + .map(|version| ProjectAnalyticsEvent { + project_id: version.inner.project_id.into(), + timestamp: version.inner.date_published, + kind: ProjectAnalyticsEventKind::VersionUploaded { + version_id: version.inner.id.into(), + version_name: version.inner.name.clone(), + version_number: version.inner.version_number.clone(), + }, + }) + .collect::>(); + project_events.extend( + fetch_project_status_change_events( + &project_ids, + &req.time_range, + &pool, + ) + .await?, + ); + project_events.sort_by_key(|event| event.timestamp); let affiliate_code_ids = DBAffiliateCode::get_by_affiliate(user.id.into(), &**pool) @@ -830,9 +902,8 @@ pub async fn fetch_analytics( use ProjectDownloadsField as F; let uses = |field| metrics.bucket_by.contains(&field); - query_clickhouse::( + query_clickhouse_downloads( &mut query_clickhouse_cx, - query::DOWNLOADS, &[ ("use_project_id", uses(F::ProjectId)), ("use_domain", uses(F::Domain)), @@ -844,37 +915,6 @@ pub async fn fetch_analytics( ("use_game_version", uses(F::GameVersion)), ("use_loader", uses(F::Loader)), ], - |row| row.bucket, - |row| { - let country = if uses(F::Country) { - Some(condense_country(row.country, row.downloads)) - } else { - None - }; - AnalyticsData::Project(ProjectAnalytics { - source_project: row.project_id.into(), - metrics: ProjectMetrics::Downloads(ProjectDownloads { - domain: none_if_empty(row.domain), - user_agent: if uses(F::UserAgent) { - normalize_download_source(&row.user_agent) - } else { - None - }, - version_id: none_if_zero_version_id(row.version_id), - monetized: match row.monetized { - 0 => Some(false), - 1 => Some(true), - _ => None, - }, - country, - reason: none_if_empty(row.reason) - .and_then(|s| s.parse().ok()), - game_version: none_if_empty(row.game_version), - loader: none_if_empty(row.loader), - downloads: row.downloads, - }), - }) - }, ) .await?; } @@ -1103,8 +1143,9 @@ pub async fn fetch_analytics( } } - Ok(web::Json(FetchResponse { + Ok(web::Json(GetResponse { metrics: time_slices, + project_events, })) } @@ -1195,6 +1236,57 @@ fn condense_country(country: String, count: u64) -> String { } } +async fn fetch_project_status_change_events( + project_ids: &[DBProjectId], + time_range: &TimeRange, + pool: &PgPool, +) -> Result, ApiError> { + let project_id_values = + project_ids.iter().map(|id| id.0).collect::>(); + + let rows = sqlx::query!( + r#" + SELECT + t.mod_id AS "project_id!", + tm.created, + tm.body AS "body: sqlx::types::Json" + FROM threads_messages tm + INNER JOIN threads t ON t.id = tm.thread_id + WHERE + t.mod_id = ANY($1) + AND tm.body->>'type' = 'status_change' + AND tm.created BETWEEN $2 AND $3 + "#, + &project_id_values, + time_range.start, + time_range.end, + ) + .fetch_all(&**pool) + .await?; + + Ok(rows + .into_iter() + .filter_map(|row| { + let MessageBody::StatusChange { + old_status, + new_status, + } = row.body.0 + else { + return None; + }; + + Some(ProjectAnalyticsEvent { + project_id: DBProjectId(row.project_id).into(), + timestamp: row.created, + kind: ProjectAnalyticsEventKind::StatusChanged { + status_from: old_status, + status_to: new_status, + }, + }) + }) + .collect()) +} + struct QueryClickhouseContext<'a> { clickhouse: &'a clickhouse::Client, req: &'a GetRequest, @@ -1205,6 +1297,107 @@ struct QueryClickhouseContext<'a> { affiliate_code_ids: &'a [DBAffiliateCodeId], } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct DownloadBucket { + bucket: u64, + project_id: DBProjectId, + domain: Option, + user_agent: Option, + version_id: Option, + monetized: Option, + country: Option, + reason: Option, + game_version: Option, + loader: Option, +} + +async fn query_clickhouse_downloads( + cx: &mut QueryClickhouseContext<'_>, + use_columns: &[(&str, bool)], +) -> Result<(), ApiError> { + let mut query = cx + .clickhouse + .query(query::DOWNLOADS) + .param("time_range_start", cx.req.time_range.start.timestamp()) + .param("time_range_end", cx.req.time_range.end.timestamp()) + .param("time_slices", cx.time_slices.len()) + .param("project_ids", cx.project_ids) + .param("parent_version_ids", cx.parent_version_ids) + .param("parent_version_project_ids", cx.parent_version_project_ids) + .param("affiliate_code_ids", cx.affiliate_code_ids); + for (param_name, used) in use_columns { + query = query.param(param_name, used) + } + + let uses = |name| { + use_columns + .iter() + .any(|(column_name, used)| *column_name == name && *used) + }; + let mut cursor = query.fetch::()?; + let mut buckets = HashMap::::new(); + + while let Some(row) = cursor.next().await? { + let key = DownloadBucket { + bucket: row.bucket, + project_id: row.project_id, + domain: uses("use_domain").then(|| row.domain.clone()), + user_agent: uses("use_user_agent") + .then(|| normalize_download_source(&row.user_agent)) + .flatten(), + version_id: uses("use_version_id").then_some(row.version_id), + monetized: if uses("use_monetized") { + match row.monetized { + 0 => Some(false), + 1 => Some(true), + _ => None, + } + } else { + None + }, + country: uses("use_country").then(|| row.country.clone()), + reason: if uses("use_reason") { + none_if_empty(row.reason.clone()).and_then(|s| s.parse().ok()) + } else { + None + }, + game_version: uses("use_game_version") + .then(|| row.game_version.clone()), + loader: uses("use_loader").then(|| row.loader.clone()), + }; + + *buckets.entry(key).or_default() += row.downloads; + } + + for (key, downloads) in buckets { + let bucket = key.bucket as usize; + add_to_time_slice( + cx.time_slices, + bucket, + AnalyticsData::Project(ProjectAnalytics { + source_project: key.project_id.into(), + metrics: ProjectMetrics::Downloads(ProjectDownloads { + domain: key.domain.and_then(none_if_empty), + user_agent: key.user_agent, + version_id: key + .version_id + .and_then(none_if_zero_version_id), + monetized: key.monetized, + country: key + .country + .map(|country| condense_country(country, downloads)), + reason: key.reason, + game_version: key.game_version.and_then(none_if_empty), + loader: key.loader.and_then(none_if_empty), + downloads, + }), + }), + )?; + } + + Ok(()) +} + async fn query_clickhouse( cx: &mut QueryClickhouseContext<'_>, query: &str, @@ -1395,7 +1588,7 @@ mod tests { let test_project_2 = ProjectId(456); let test_project_3 = ProjectId(789); - let src = FetchResponse { + let src = GetResponse { metrics: vec![ TimeSlice(vec![ AnalyticsData::Project(ProjectAnalytics { @@ -1422,6 +1615,7 @@ mod tests { }), })]), ], + project_events: vec![], }; let target = json!({ "metrics": [ @@ -1446,7 +1640,8 @@ mod tests { "revenue": "200.00", } ] - ] + ], + "project_events": [] }); assert_eq!(serde_json::to_value(src).unwrap(), target);