Fetch project analytics events on analytics get (#6143)

* Fetch project analytics events

* fix

* post-query ua bucketing

* fmt
This commit is contained in:
aecsocket
2026-05-22 19:32:33 +01:00
committed by GitHub
parent 657186398d
commit 5727e156ed
2 changed files with 274 additions and 43 deletions
@@ -0,0 +1,36 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n t.mod_id AS \"project_id!\",\n tm.created,\n tm.body AS \"body: sqlx::types::Json<MessageBody>\"\n FROM threads_messages tm\n INNER JOIN threads t ON t.id = tm.thread_id\n WHERE\n t.mod_id = ANY($1)\n AND tm.body->>'type' = 'status_change'\n AND tm.created BETWEEN $2 AND $3\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "project_id!",
"type_info": "Int8"
},
{
"ordinal": 1,
"name": "created",
"type_info": "Timestamptz"
},
{
"ordinal": 2,
"name": "body: sqlx::types::Json<MessageBody>",
"type_info": "Jsonb"
}
],
"parameters": {
"Left": [
"Int8Array",
"Timestamptz",
"Timestamptz"
]
},
"nullable": [
true,
false,
false
]
},
"hash": "a5141c0435441f062231c842cb5db5e0c78f8f3896c1e9f2b2b56cce41fa1591"
}
+238 -43
View File
@@ -9,7 +9,7 @@
mod old;
use std::{num::NonZeroU64, sync::LazyLock};
use std::{collections::HashMap, num::NonZeroU64, sync::LazyLock};
use crate::database::PgPool;
use actix_web::{HttpRequest, post, web};
@@ -21,19 +21,24 @@ use rust_decimal::Decimal;
use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Error as _};
use crate::{
auth::{AuthenticationError, get_user_from_headers},
auth::{
AuthenticationError, checks::filter_visible_version_ids,
get_user_from_headers,
},
database::{
self, DBProject,
models::{
DBAffiliateCode, DBAffiliateCodeId, DBProjectId, DBUser, DBUserId,
DBVersionId,
DBVersion, DBVersionId,
},
redis::RedisPool,
},
models::{
ids::{AffiliateCodeId, ProjectId, VersionId},
pats::Scopes,
projects::ProjectStatus,
teams::ProjectPermissions,
threads::MessageBody,
v3::analytics::DownloadReason,
},
queue::session::AuthQueue,
@@ -255,17 +260,46 @@ pub const MAX_TIME_SLICES: usize = 1024;
/// Response for a [`GetRequest`].
#[derive(Debug, Default, Serialize, Deserialize, utoipa::ToSchema)]
pub struct FetchResponse {
pub struct GetResponse {
/// List of N [`TimeSlice`]s, where each slice represents an equal
/// time interval of metrics collection. The number of slices is determined
/// by [`GetRequest::time_range`].
pub metrics: Vec<TimeSlice>,
/// List of events associated with projects that were requested.
pub project_events: Vec<ProjectAnalyticsEvent>,
}
/// Single time interval of metrics collection.
#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)]
pub struct TimeSlice(pub Vec<AnalyticsData>);
/// Notable update to a project which may reflect in analytics metrics.
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
pub struct ProjectAnalyticsEvent {
/// ID of the event's project.
pub project_id: ProjectId,
/// When the event occurred.
pub timestamp: DateTime<Utc>,
#[serde(flatten)]
pub kind: ProjectAnalyticsEventKind,
}
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum ProjectAnalyticsEventKind {
/// New version of this project was uploaded.
VersionUploaded {
version_id: VersionId,
version_name: String,
version_number: String,
},
/// Project changed status.
StatusChanged {
status_from: ProjectStatus,
status_to: ProjectStatus,
},
}
/// Metrics collected in a single [`TimeSlice`].
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(untagged)] // the presence of `source_project`, `source_affiliate_code` determines the kind
@@ -351,7 +385,7 @@ pub struct ProjectDownloads {
downloads: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, utoipa::ToSchema)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, utoipa::ToSchema)]
pub enum DownloadSource {
Website,
ModrinthApp,
@@ -665,7 +699,7 @@ mod query {
/// Fetches analytics data for the authorized user's projects.
#[utoipa::path(
responses((status = OK, body = inline(FetchResponse))),
responses((status = OK, body = inline(GetResponse))),
)]
#[post("")]
pub async fn fetch_analytics(
@@ -675,7 +709,7 @@ pub async fn fetch_analytics(
redis: web::Data<RedisPool>,
session_queue: web::Data<AuthQueue>,
clickhouse: web::Data<clickhouse::Client>,
) -> Result<web::Json<FetchResponse>, ApiError> {
) -> Result<web::Json<GetResponse>, ApiError> {
let (scopes, user) = get_user_from_headers(
&http_req,
&**pool,
@@ -768,6 +802,44 @@ pub async fn fetch_analytics(
.iter()
.map(|version| DBProjectId(version.mod_id))
.collect::<Vec<_>>();
let parent_version_data =
DBVersion::get_many(&parent_version_ids, &**pool, &redis).await?;
let visible_version_ids = filter_visible_version_ids(
parent_version_data
.iter()
.map(|version| &version.inner)
.collect(),
&Some(user.clone()),
&pool,
&redis,
)
.await?;
let mut project_events = parent_version_data
.iter()
.filter(|version| {
visible_version_ids.contains(&version.inner.id)
&& version.inner.date_published >= req.time_range.start
&& version.inner.date_published <= req.time_range.end
})
.map(|version| ProjectAnalyticsEvent {
project_id: version.inner.project_id.into(),
timestamp: version.inner.date_published,
kind: ProjectAnalyticsEventKind::VersionUploaded {
version_id: version.inner.id.into(),
version_name: version.inner.name.clone(),
version_number: version.inner.version_number.clone(),
},
})
.collect::<Vec<_>>();
project_events.extend(
fetch_project_status_change_events(
&project_ids,
&req.time_range,
&pool,
)
.await?,
);
project_events.sort_by_key(|event| event.timestamp);
let affiliate_code_ids =
DBAffiliateCode::get_by_affiliate(user.id.into(), &**pool)
@@ -830,9 +902,8 @@ pub async fn fetch_analytics(
use ProjectDownloadsField as F;
let uses = |field| metrics.bucket_by.contains(&field);
query_clickhouse::<query::DownloadRow>(
query_clickhouse_downloads(
&mut query_clickhouse_cx,
query::DOWNLOADS,
&[
("use_project_id", uses(F::ProjectId)),
("use_domain", uses(F::Domain)),
@@ -844,37 +915,6 @@ pub async fn fetch_analytics(
("use_game_version", uses(F::GameVersion)),
("use_loader", uses(F::Loader)),
],
|row| row.bucket,
|row| {
let country = if uses(F::Country) {
Some(condense_country(row.country, row.downloads))
} else {
None
};
AnalyticsData::Project(ProjectAnalytics {
source_project: row.project_id.into(),
metrics: ProjectMetrics::Downloads(ProjectDownloads {
domain: none_if_empty(row.domain),
user_agent: if uses(F::UserAgent) {
normalize_download_source(&row.user_agent)
} else {
None
},
version_id: none_if_zero_version_id(row.version_id),
monetized: match row.monetized {
0 => Some(false),
1 => Some(true),
_ => None,
},
country,
reason: none_if_empty(row.reason)
.and_then(|s| s.parse().ok()),
game_version: none_if_empty(row.game_version),
loader: none_if_empty(row.loader),
downloads: row.downloads,
}),
})
},
)
.await?;
}
@@ -1103,8 +1143,9 @@ pub async fn fetch_analytics(
}
}
Ok(web::Json(FetchResponse {
Ok(web::Json(GetResponse {
metrics: time_slices,
project_events,
}))
}
@@ -1195,6 +1236,57 @@ fn condense_country(country: String, count: u64) -> String {
}
}
async fn fetch_project_status_change_events(
project_ids: &[DBProjectId],
time_range: &TimeRange,
pool: &PgPool,
) -> Result<Vec<ProjectAnalyticsEvent>, ApiError> {
let project_id_values =
project_ids.iter().map(|id| id.0).collect::<Vec<_>>();
let rows = sqlx::query!(
r#"
SELECT
t.mod_id AS "project_id!",
tm.created,
tm.body AS "body: sqlx::types::Json<MessageBody>"
FROM threads_messages tm
INNER JOIN threads t ON t.id = tm.thread_id
WHERE
t.mod_id = ANY($1)
AND tm.body->>'type' = 'status_change'
AND tm.created BETWEEN $2 AND $3
"#,
&project_id_values,
time_range.start,
time_range.end,
)
.fetch_all(&**pool)
.await?;
Ok(rows
.into_iter()
.filter_map(|row| {
let MessageBody::StatusChange {
old_status,
new_status,
} = row.body.0
else {
return None;
};
Some(ProjectAnalyticsEvent {
project_id: DBProjectId(row.project_id).into(),
timestamp: row.created,
kind: ProjectAnalyticsEventKind::StatusChanged {
status_from: old_status,
status_to: new_status,
},
})
})
.collect())
}
struct QueryClickhouseContext<'a> {
clickhouse: &'a clickhouse::Client,
req: &'a GetRequest,
@@ -1205,6 +1297,107 @@ struct QueryClickhouseContext<'a> {
affiliate_code_ids: &'a [DBAffiliateCodeId],
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct DownloadBucket {
bucket: u64,
project_id: DBProjectId,
domain: Option<String>,
user_agent: Option<DownloadSource>,
version_id: Option<DBVersionId>,
monetized: Option<bool>,
country: Option<String>,
reason: Option<DownloadReason>,
game_version: Option<String>,
loader: Option<String>,
}
async fn query_clickhouse_downloads(
cx: &mut QueryClickhouseContext<'_>,
use_columns: &[(&str, bool)],
) -> Result<(), ApiError> {
let mut query = cx
.clickhouse
.query(query::DOWNLOADS)
.param("time_range_start", cx.req.time_range.start.timestamp())
.param("time_range_end", cx.req.time_range.end.timestamp())
.param("time_slices", cx.time_slices.len())
.param("project_ids", cx.project_ids)
.param("parent_version_ids", cx.parent_version_ids)
.param("parent_version_project_ids", cx.parent_version_project_ids)
.param("affiliate_code_ids", cx.affiliate_code_ids);
for (param_name, used) in use_columns {
query = query.param(param_name, used)
}
let uses = |name| {
use_columns
.iter()
.any(|(column_name, used)| *column_name == name && *used)
};
let mut cursor = query.fetch::<query::DownloadRow>()?;
let mut buckets = HashMap::<DownloadBucket, u64>::new();
while let Some(row) = cursor.next().await? {
let key = DownloadBucket {
bucket: row.bucket,
project_id: row.project_id,
domain: uses("use_domain").then(|| row.domain.clone()),
user_agent: uses("use_user_agent")
.then(|| normalize_download_source(&row.user_agent))
.flatten(),
version_id: uses("use_version_id").then_some(row.version_id),
monetized: if uses("use_monetized") {
match row.monetized {
0 => Some(false),
1 => Some(true),
_ => None,
}
} else {
None
},
country: uses("use_country").then(|| row.country.clone()),
reason: if uses("use_reason") {
none_if_empty(row.reason.clone()).and_then(|s| s.parse().ok())
} else {
None
},
game_version: uses("use_game_version")
.then(|| row.game_version.clone()),
loader: uses("use_loader").then(|| row.loader.clone()),
};
*buckets.entry(key).or_default() += row.downloads;
}
for (key, downloads) in buckets {
let bucket = key.bucket as usize;
add_to_time_slice(
cx.time_slices,
bucket,
AnalyticsData::Project(ProjectAnalytics {
source_project: key.project_id.into(),
metrics: ProjectMetrics::Downloads(ProjectDownloads {
domain: key.domain.and_then(none_if_empty),
user_agent: key.user_agent,
version_id: key
.version_id
.and_then(none_if_zero_version_id),
monetized: key.monetized,
country: key
.country
.map(|country| condense_country(country, downloads)),
reason: key.reason,
game_version: key.game_version.and_then(none_if_empty),
loader: key.loader.and_then(none_if_empty),
downloads,
}),
}),
)?;
}
Ok(())
}
async fn query_clickhouse<Row>(
cx: &mut QueryClickhouseContext<'_>,
query: &str,
@@ -1395,7 +1588,7 @@ mod tests {
let test_project_2 = ProjectId(456);
let test_project_3 = ProjectId(789);
let src = FetchResponse {
let src = GetResponse {
metrics: vec![
TimeSlice(vec![
AnalyticsData::Project(ProjectAnalytics {
@@ -1422,6 +1615,7 @@ mod tests {
}),
})]),
],
project_events: vec![],
};
let target = json!({
"metrics": [
@@ -1446,7 +1640,8 @@ mod tests {
"revenue": "200.00",
}
]
]
],
"project_events": []
});
assert_eq!(serde_json::to_value(src).unwrap(), target);