You've already forked AstralRinth
Improve analytics performance, analytics faceting (#6180)
* fix uri too long * all projects route for user * analytics facet fetching * cache download source regexes * filtering * prepare * Split up analytics metrics into separate modules * prepare * fix ci
This commit is contained in:
Generated
+22
@@ -0,0 +1,22 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT id\n FROM versions\n WHERE mod_id = ANY($1)\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "id",
|
||||
"type_info": "Int8"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int8Array"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "12fa322e09465aab925ac33f8b2a1371fb63be744f1d87c68229342ffadbe998"
|
||||
}
|
||||
Generated
-37
@@ -1,37 +0,0 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "SELECT\n WIDTH_BUCKET(\n EXTRACT(EPOCH FROM created)::bigint,\n EXTRACT(EPOCH FROM $1::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n EXTRACT(EPOCH FROM $2::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n $3::integer\n ) AS bucket,\n mod_id,\n SUM(amount) amount_sum\n FROM payouts_values\n WHERE\n -- only project revenue is counted here\n -- for affiliate code revenue, see `affiliate_code_revenue`\n payouts_values.mod_id IS NOT NULL\n AND payouts_values.mod_id = ANY($4)\n AND created BETWEEN $1 AND $2\n GROUP BY bucket, mod_id",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "bucket",
|
||||
"type_info": "Int4"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "mod_id",
|
||||
"type_info": "Int8"
|
||||
},
|
||||
{
|
||||
"ordinal": 2,
|
||||
"name": "amount_sum",
|
||||
"type_info": "Numeric"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Timestamptz",
|
||||
"Timestamptz",
|
||||
"Int4",
|
||||
"Int8Array"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
null,
|
||||
true,
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "8d38218e5a0c9297be7c6c77acf40a2339b12ff15f1f9e53a27a1c599a33e43b"
|
||||
}
|
||||
Generated
-38
@@ -1,38 +0,0 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "SELECT\n WIDTH_BUCKET(\n EXTRACT(EPOCH FROM usa.created_at)::bigint,\n EXTRACT(EPOCH FROM $1::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n EXTRACT(EPOCH FROM $2::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n $3::integer\n ) AS bucket,\n CASE WHEN $5 THEN affiliate_code ELSE 0 END AS affiliate_code,\n COUNT(*) AS conversions\n FROM users_subscriptions_affiliations usa\n INNER JOIN affiliate_codes ac ON ac.id = usa.affiliate_code\n INNER JOIN users_subscriptions us ON us.id = usa.subscription_id\n INNER JOIN charges c ON c.subscription_id = us.id\n WHERE\n ac.affiliate = $4\n AND usa.created_at BETWEEN $1 AND $2\n AND c.status = 'succeeded'\n GROUP BY bucket, affiliate_code",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "bucket",
|
||||
"type_info": "Int4"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "affiliate_code",
|
||||
"type_info": "Int8"
|
||||
},
|
||||
{
|
||||
"ordinal": 2,
|
||||
"name": "conversions",
|
||||
"type_info": "Int8"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Timestamptz",
|
||||
"Timestamptz",
|
||||
"Int4",
|
||||
"Int8",
|
||||
"Bool"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
null,
|
||||
null,
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "9152c0d7e7f508491b601c16c6eed05e2333475e96007180acda6086ee2825c0"
|
||||
}
|
||||
Generated
+22
@@ -0,0 +1,22 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT m.id\n FROM mods m\n WHERE m.organization_id = ANY($1)\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "id",
|
||||
"type_info": "Int8"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int8Array"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "9cabb8fd373e6ebf76e6d6a6711e83b71b766d64e05cecbfab58194eff89ec08"
|
||||
}
|
||||
Generated
-38
@@ -1,38 +0,0 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "SELECT\n WIDTH_BUCKET(\n EXTRACT(EPOCH FROM created)::bigint,\n EXTRACT(EPOCH FROM $1::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n EXTRACT(EPOCH FROM $2::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n $3::integer\n ) AS bucket,\n CASE WHEN $5 THEN affiliate_code_source ELSE 0 END AS affiliate_code_source,\n SUM(amount) amount_sum\n FROM payouts_values\n WHERE\n user_id = $4\n AND payouts_values.affiliate_code_source IS NOT NULL\n AND created BETWEEN $1 AND $2\n GROUP BY bucket, affiliate_code_source",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "bucket",
|
||||
"type_info": "Int4"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "affiliate_code_source",
|
||||
"type_info": "Int8"
|
||||
},
|
||||
{
|
||||
"ordinal": 2,
|
||||
"name": "amount_sum",
|
||||
"type_info": "Numeric"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Timestamptz",
|
||||
"Timestamptz",
|
||||
"Int4",
|
||||
"Int8",
|
||||
"Bool"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
null,
|
||||
null,
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "eeea6cad39d645d3f5a0a4115c8350e08b7850a09a86c62d0de371a1caed7c07"
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,418 @@
|
||||
use std::collections::HashSet;
|
||||
|
||||
use actix_web::{HttpRequest, post, web};
|
||||
use serde::Serialize;
|
||||
|
||||
use super::{DownloadSource, GetRequest, normalize_download_source};
|
||||
use crate::{
|
||||
auth::get_user_from_headers,
|
||||
database::{
|
||||
PgPool,
|
||||
models::{DBProjectId, DBUser, DBVersionId},
|
||||
redis::RedisPool,
|
||||
},
|
||||
models::{ids::VersionId, pats::Scopes, v3::analytics::DownloadReason},
|
||||
queue::session::AuthQueue,
|
||||
routes::ApiError,
|
||||
};
|
||||
|
||||
pub fn config(cfg: &mut utoipa_actix_web::service_config::ServiceConfig) {
|
||||
cfg.service(fetch_facets);
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, utoipa::ToSchema)]
|
||||
pub struct FacetsResponse {
|
||||
pub facets: AnalyticsFacets,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, utoipa::ToSchema)]
|
||||
pub struct AnalyticsFacets {
|
||||
pub project_views: ProjectViewsFacets,
|
||||
pub project_downloads: ProjectDownloadsFacets,
|
||||
pub project_playtime: ProjectPlaytimeFacets,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, utoipa::ToSchema)]
|
||||
pub struct ProjectViewsFacets {
|
||||
pub domain: Vec<String>,
|
||||
pub site_path: Vec<String>,
|
||||
pub monetized: Vec<bool>,
|
||||
pub country: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, utoipa::ToSchema)]
|
||||
pub struct ProjectDownloadsFacets {
|
||||
pub domain: Vec<String>,
|
||||
pub user_agent: Vec<DownloadSource>,
|
||||
pub version_id: Vec<VersionId>,
|
||||
pub monetized: Vec<bool>,
|
||||
pub country: Vec<String>,
|
||||
pub reason: Vec<DownloadReason>,
|
||||
pub game_version: Vec<String>,
|
||||
pub loader: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, utoipa::ToSchema)]
|
||||
pub struct ProjectPlaytimeFacets {
|
||||
pub version_id: Vec<VersionId>,
|
||||
pub loader: Vec<String>,
|
||||
pub game_version: Vec<String>,
|
||||
pub country: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, clickhouse::Row, serde::Deserialize)]
|
||||
struct StringFacetRow {
|
||||
value: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, clickhouse::Row, serde::Deserialize)]
|
||||
struct VersionFacetRow {
|
||||
value: DBVersionId,
|
||||
}
|
||||
|
||||
#[derive(Debug, clickhouse::Row, serde::Deserialize)]
|
||||
struct BoolFacetRow {
|
||||
value: bool,
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
responses((status = OK, body = inline(FacetsResponse))),
|
||||
)]
|
||||
#[post("/facets")]
|
||||
pub async fn fetch_facets(
|
||||
http_req: HttpRequest,
|
||||
req: web::Json<GetRequest>,
|
||||
pool: web::Data<PgPool>,
|
||||
redis: web::Data<RedisPool>,
|
||||
session_queue: web::Data<AuthQueue>,
|
||||
clickhouse: web::Data<clickhouse::Client>,
|
||||
) -> Result<web::Json<FacetsResponse>, ApiError> {
|
||||
let user = get_user_from_headers(
|
||||
&http_req,
|
||||
&**pool,
|
||||
&redis,
|
||||
&session_queue,
|
||||
Scopes::ANALYTICS,
|
||||
)
|
||||
.await?
|
||||
.1;
|
||||
|
||||
let project_ids = if req.project_ids.is_empty() {
|
||||
DBUser::get_projects(user.id.into(), &**pool, &redis).await?
|
||||
} else {
|
||||
req.project_ids
|
||||
.iter()
|
||||
.map(|id| DBProjectId::from(*id))
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
let project_ids =
|
||||
super::filter_allowed_project_ids(&project_ids, &user, &pool, &redis)
|
||||
.await?;
|
||||
|
||||
let parent_version_ids =
|
||||
fetch_project_version_ids(&project_ids, &pool).await?;
|
||||
|
||||
Ok(web::Json(FacetsResponse {
|
||||
facets: AnalyticsFacets {
|
||||
project_views: fetch_project_views_facets(
|
||||
&clickhouse,
|
||||
&project_ids,
|
||||
)
|
||||
.await?,
|
||||
project_downloads: fetch_project_downloads_facets(
|
||||
&clickhouse,
|
||||
&project_ids,
|
||||
)
|
||||
.await?,
|
||||
project_playtime: fetch_project_playtime_facets(
|
||||
&clickhouse,
|
||||
&project_ids,
|
||||
&parent_version_ids,
|
||||
)
|
||||
.await?,
|
||||
},
|
||||
}))
|
||||
}
|
||||
|
||||
async fn fetch_project_version_ids(
|
||||
project_ids: &[DBProjectId],
|
||||
pool: &PgPool,
|
||||
) -> Result<Vec<DBVersionId>, ApiError> {
|
||||
let project_id_values =
|
||||
project_ids.iter().map(|id| id.0).collect::<Vec<_>>();
|
||||
Ok(sqlx::query!(
|
||||
"
|
||||
SELECT id
|
||||
FROM versions
|
||||
WHERE mod_id = ANY($1)
|
||||
",
|
||||
&project_id_values,
|
||||
)
|
||||
.fetch_all(pool)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|row| DBVersionId(row.id))
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn fetch_project_views_facets(
|
||||
clickhouse: &clickhouse::Client,
|
||||
project_ids: &[DBProjectId],
|
||||
) -> Result<ProjectViewsFacets, ApiError> {
|
||||
Ok(ProjectViewsFacets {
|
||||
domain: fetch_string_facet(
|
||||
clickhouse,
|
||||
"SELECT DISTINCT domain AS value FROM views WHERE project_id IN {project_ids: Array(UInt64)} AND domain != '' ORDER BY value",
|
||||
project_ids,
|
||||
)
|
||||
.await?,
|
||||
site_path: fetch_string_facet(
|
||||
clickhouse,
|
||||
"SELECT DISTINCT site_path AS value FROM views WHERE project_id IN {project_ids: Array(UInt64)} AND site_path != '' ORDER BY value",
|
||||
project_ids,
|
||||
)
|
||||
.await?,
|
||||
monetized: fetch_bool_facet(
|
||||
clickhouse,
|
||||
"SELECT DISTINCT monetized AS value FROM views WHERE project_id IN {project_ids: Array(UInt64)} ORDER BY value",
|
||||
project_ids,
|
||||
)
|
||||
.await?,
|
||||
country: fetch_string_facet(
|
||||
clickhouse,
|
||||
"SELECT DISTINCT country AS value FROM views WHERE project_id IN {project_ids: Array(UInt64)} AND country != '' ORDER BY value",
|
||||
project_ids,
|
||||
)
|
||||
.await?,
|
||||
})
|
||||
}
|
||||
|
||||
async fn fetch_project_downloads_facets(
|
||||
clickhouse: &clickhouse::Client,
|
||||
project_ids: &[DBProjectId],
|
||||
) -> Result<ProjectDownloadsFacets, ApiError> {
|
||||
let user_agents = fetch_string_facet(
|
||||
clickhouse,
|
||||
"SELECT DISTINCT user_agent AS value FROM downloads WHERE project_id IN {project_ids: Array(UInt64)} AND user_agent != ''",
|
||||
project_ids,
|
||||
)
|
||||
.await?;
|
||||
let user_agent = normalize_download_source_facets(&user_agents);
|
||||
|
||||
Ok(ProjectDownloadsFacets {
|
||||
domain: fetch_string_facet(
|
||||
clickhouse,
|
||||
"SELECT DISTINCT domain AS value FROM downloads WHERE project_id IN {project_ids: Array(UInt64)} AND domain != '' ORDER BY value",
|
||||
project_ids,
|
||||
)
|
||||
.await?,
|
||||
user_agent,
|
||||
version_id: fetch_version_facet(
|
||||
clickhouse,
|
||||
"SELECT DISTINCT version_id AS value FROM downloads WHERE project_id IN {project_ids: Array(UInt64)} AND version_id != 0 ORDER BY value",
|
||||
project_ids,
|
||||
)
|
||||
.await?,
|
||||
monetized: fetch_bool_facet(
|
||||
clickhouse,
|
||||
"SELECT DISTINCT user_id != 0 AS value FROM downloads WHERE project_id IN {project_ids: Array(UInt64)} ORDER BY value",
|
||||
project_ids,
|
||||
)
|
||||
.await?,
|
||||
country: fetch_string_facet(
|
||||
clickhouse,
|
||||
"SELECT DISTINCT country AS value FROM downloads WHERE project_id IN {project_ids: Array(UInt64)} AND country != '' ORDER BY value",
|
||||
project_ids,
|
||||
)
|
||||
.await?,
|
||||
reason: fetch_string_facet(
|
||||
clickhouse,
|
||||
"SELECT DISTINCT reason AS value FROM downloads WHERE project_id IN {project_ids: Array(UInt64)} AND reason != '' ORDER BY value",
|
||||
project_ids,
|
||||
)
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter_map(|reason| reason.parse().ok())
|
||||
.collect(),
|
||||
game_version: fetch_string_facet(
|
||||
clickhouse,
|
||||
"SELECT DISTINCT game_version AS value FROM downloads WHERE project_id IN {project_ids: Array(UInt64)} AND game_version != '' ORDER BY value",
|
||||
project_ids,
|
||||
)
|
||||
.await?,
|
||||
loader: fetch_string_facet(
|
||||
clickhouse,
|
||||
"SELECT DISTINCT loader AS value FROM downloads WHERE project_id IN {project_ids: Array(UInt64)} AND loader != '' ORDER BY value",
|
||||
project_ids,
|
||||
)
|
||||
.await?,
|
||||
})
|
||||
}
|
||||
|
||||
fn normalize_download_source_facets(
|
||||
user_agents: &[String],
|
||||
) -> Vec<DownloadSource> {
|
||||
user_agents
|
||||
.iter()
|
||||
.filter_map(|user_agent| normalize_download_source(user_agent))
|
||||
.collect::<HashSet<_>>()
|
||||
.into_iter()
|
||||
.collect()
|
||||
}
|
||||
|
||||
async fn fetch_project_playtime_facets(
|
||||
clickhouse: &clickhouse::Client,
|
||||
project_ids: &[DBProjectId],
|
||||
parent_version_ids: &[DBVersionId],
|
||||
) -> Result<ProjectPlaytimeFacets, ApiError> {
|
||||
Ok(ProjectPlaytimeFacets {
|
||||
version_id: fetch_playtime_version_facet(
|
||||
clickhouse,
|
||||
project_ids,
|
||||
parent_version_ids,
|
||||
)
|
||||
.await?,
|
||||
loader: fetch_playtime_string_facet(
|
||||
clickhouse,
|
||||
"loader",
|
||||
project_ids,
|
||||
parent_version_ids,
|
||||
)
|
||||
.await?,
|
||||
game_version: fetch_playtime_string_facet(
|
||||
clickhouse,
|
||||
"game_version",
|
||||
project_ids,
|
||||
parent_version_ids,
|
||||
)
|
||||
.await?,
|
||||
country: fetch_playtime_string_facet(
|
||||
clickhouse,
|
||||
"country",
|
||||
project_ids,
|
||||
parent_version_ids,
|
||||
)
|
||||
.await?,
|
||||
})
|
||||
}
|
||||
|
||||
async fn fetch_string_facet(
|
||||
clickhouse: &clickhouse::Client,
|
||||
query: &str,
|
||||
project_ids: &[DBProjectId],
|
||||
) -> Result<Vec<String>, ApiError> {
|
||||
let mut rows = clickhouse
|
||||
.query(query)
|
||||
.param("project_ids", project_ids)
|
||||
.fetch::<StringFacetRow>()?;
|
||||
let mut values = Vec::new();
|
||||
while let Some(row) = rows.next().await? {
|
||||
values.push(row.value);
|
||||
}
|
||||
Ok(values)
|
||||
}
|
||||
|
||||
async fn fetch_version_facet(
|
||||
clickhouse: &clickhouse::Client,
|
||||
query: &str,
|
||||
project_ids: &[DBProjectId],
|
||||
) -> Result<Vec<VersionId>, ApiError> {
|
||||
let mut rows = clickhouse
|
||||
.query(query)
|
||||
.param("project_ids", project_ids)
|
||||
.fetch::<VersionFacetRow>()?;
|
||||
let mut values = Vec::new();
|
||||
while let Some(row) = rows.next().await? {
|
||||
values.push(row.value.into());
|
||||
}
|
||||
Ok(values)
|
||||
}
|
||||
|
||||
async fn fetch_bool_facet(
|
||||
clickhouse: &clickhouse::Client,
|
||||
query: &str,
|
||||
project_ids: &[DBProjectId],
|
||||
) -> Result<Vec<bool>, ApiError> {
|
||||
let mut rows = clickhouse
|
||||
.query(query)
|
||||
.param("project_ids", project_ids)
|
||||
.fetch::<BoolFacetRow>()?;
|
||||
let mut values = Vec::new();
|
||||
while let Some(row) = rows.next().await? {
|
||||
values.push(row.value);
|
||||
}
|
||||
Ok(values)
|
||||
}
|
||||
|
||||
async fn fetch_playtime_string_facet(
|
||||
clickhouse: &clickhouse::Client,
|
||||
column: &str,
|
||||
project_ids: &[DBProjectId],
|
||||
parent_version_ids: &[DBVersionId],
|
||||
) -> Result<Vec<String>, ApiError> {
|
||||
let query = format!(
|
||||
"SELECT DISTINCT {column} AS value
|
||||
FROM playtime
|
||||
WHERE (project_id IN {{project_ids: Array(UInt64)}} OR parent IN {{parent_version_ids: Array(UInt64)}})
|
||||
AND {column} != ''
|
||||
ORDER BY value"
|
||||
);
|
||||
let mut rows = clickhouse
|
||||
.query(&query)
|
||||
.param("project_ids", project_ids)
|
||||
.param("parent_version_ids", parent_version_ids)
|
||||
.fetch::<StringFacetRow>()?;
|
||||
let mut values = Vec::new();
|
||||
while let Some(row) = rows.next().await? {
|
||||
values.push(row.value);
|
||||
}
|
||||
Ok(values)
|
||||
}
|
||||
|
||||
async fn fetch_playtime_version_facet(
|
||||
clickhouse: &clickhouse::Client,
|
||||
project_ids: &[DBProjectId],
|
||||
parent_version_ids: &[DBVersionId],
|
||||
) -> Result<Vec<VersionId>, ApiError> {
|
||||
let mut rows = clickhouse
|
||||
.query(
|
||||
"SELECT DISTINCT version_id AS value
|
||||
FROM playtime
|
||||
WHERE (project_id IN {project_ids: Array(UInt64)} OR parent IN {parent_version_ids: Array(UInt64)})
|
||||
AND version_id != 0
|
||||
ORDER BY value",
|
||||
)
|
||||
.param("project_ids", project_ids)
|
||||
.param("parent_version_ids", parent_version_ids)
|
||||
.fetch::<VersionFacetRow>()?;
|
||||
let mut values = Vec::new();
|
||||
while let Some(row) = rows.next().await? {
|
||||
values.push(row.value.into());
|
||||
}
|
||||
Ok(values)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn user_agent_facets_use_normalized_sources() {
|
||||
let user_agents = vec![
|
||||
"MultiMC/5.0".to_string(),
|
||||
"MultiMC/6.0".to_string(),
|
||||
"PrismLauncher/6.1".to_string(),
|
||||
"curl/8.7.1".to_string(),
|
||||
"Mozilla/5.0 AppleWebKit/537.36".to_string(),
|
||||
];
|
||||
|
||||
assert_eq!(
|
||||
normalize_download_source_facets(&user_agents),
|
||||
vec![
|
||||
DownloadSource::Named("MultiMC".into()),
|
||||
DownloadSource::Named("Prism Launcher".into()),
|
||||
DownloadSource::Website,
|
||||
],
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,102 @@
|
||||
use const_format::formatcp;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
database::models::DBAffiliateCodeId, models::ids::AffiliateCodeId,
|
||||
routes::ApiError,
|
||||
};
|
||||
|
||||
use super::super::{
|
||||
ClickhouseFilterParam, ClickhouseQueryParams, QueryClickhouseContext,
|
||||
query_clickhouse,
|
||||
};
|
||||
use super::{
|
||||
AffiliateCodeAnalytics, AffiliateCodeMetrics, AnalyticsData, Metrics,
|
||||
};
|
||||
|
||||
const TIME_RANGE_START: &str = "{time_range_start: UInt64}";
|
||||
const TIME_RANGE_END: &str = "{time_range_end: UInt64}";
|
||||
const TIME_SLICES: &str = "{time_slices: UInt64}";
|
||||
|
||||
/// Fields for [`super::ReturnMetrics::affiliate_code_clicks`].
|
||||
#[derive(
|
||||
Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema,
|
||||
)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum AffiliateCodeClicksField {
|
||||
/// Affiliate code ID.
|
||||
AffiliateCodeId,
|
||||
}
|
||||
|
||||
/// Filters for [`super::ReturnMetrics::affiliate_code_clicks`].
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct AffiliateCodeClicksFilters {
|
||||
/// Affiliate code IDs to include.
|
||||
#[serde(default)]
|
||||
pub affiliate_code_id: Vec<AffiliateCodeId>,
|
||||
}
|
||||
|
||||
/// [`super::ReturnMetrics::affiliate_code_clicks`].
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct AffiliateCodeClicks {
|
||||
/// Total clicks for this bucket.
|
||||
pub clicks: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, clickhouse::Row, serde::Deserialize)]
|
||||
struct AffiliateCodeClickRow {
|
||||
bucket: u64,
|
||||
affiliate_code_id: DBAffiliateCodeId,
|
||||
clicks: u64,
|
||||
}
|
||||
|
||||
const AFFILIATE_CODE_CLICKS: &str = {
|
||||
const USE_AFFILIATE_CODE_ID: &str = "{use_affiliate_code_id: Bool}";
|
||||
const FILTER_AFFILIATE_CODE_ID: &str =
|
||||
"{filter_affiliate_code_id: Array(UInt64)}";
|
||||
|
||||
formatcp!(
|
||||
"SELECT
|
||||
widthBucket(toUnixTimestamp(recorded), {TIME_RANGE_START}, {TIME_RANGE_END}, {TIME_SLICES}) AS bucket,
|
||||
if({USE_AFFILIATE_CODE_ID}, affiliate_code_id, 0) AS affiliate_code_id,
|
||||
COUNT(*) AS clicks
|
||||
FROM affiliate_code_clicks
|
||||
WHERE
|
||||
recorded BETWEEN {TIME_RANGE_START} AND {TIME_RANGE_END}
|
||||
-- make sure that the REAL affiliate code id is included,
|
||||
-- not the possibly-zero one,
|
||||
-- by using `affiliate_code_clicks.affiliate_code_id` instead of `project_id`
|
||||
AND (empty({FILTER_AFFILIATE_CODE_ID}) OR affiliate_code_id IN {FILTER_AFFILIATE_CODE_ID})
|
||||
GROUP BY bucket, affiliate_code_id"
|
||||
)
|
||||
};
|
||||
|
||||
pub(crate) async fn fetch(
|
||||
cx: &mut QueryClickhouseContext<'_>,
|
||||
metrics: &Metrics<AffiliateCodeClicksField, AffiliateCodeClicksFilters>,
|
||||
) -> Result<(), ApiError> {
|
||||
use AffiliateCodeClicksField as F;
|
||||
let uses = |field| metrics.bucket_by.contains(&field);
|
||||
|
||||
query_clickhouse::<AffiliateCodeClickRow>(
|
||||
cx,
|
||||
AFFILIATE_CODE_CLICKS,
|
||||
ClickhouseQueryParams::empty(),
|
||||
&[("use_affiliate_code_id", uses(F::AffiliateCodeId))],
|
||||
vec![ClickhouseFilterParam::AffiliateCodeId(
|
||||
"filter_affiliate_code_id",
|
||||
&metrics.filter_by.affiliate_code_id,
|
||||
)],
|
||||
|_| true,
|
||||
|row| row.bucket,
|
||||
|row| {
|
||||
AnalyticsData::AffiliateCode(AffiliateCodeAnalytics {
|
||||
source_affiliate_code: row.affiliate_code_id.into(),
|
||||
metrics: AffiliateCodeMetrics::Clicks(AffiliateCodeClicks {
|
||||
clicks: row.clicks,
|
||||
}),
|
||||
})
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -0,0 +1,125 @@
|
||||
use futures::StreamExt;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::Row;
|
||||
|
||||
use crate::{
|
||||
database::{
|
||||
PgPool,
|
||||
models::{DBAffiliateCodeId, DBUserId},
|
||||
},
|
||||
models::ids::AffiliateCodeId,
|
||||
routes::ApiError,
|
||||
util::error::Context,
|
||||
};
|
||||
|
||||
use super::super::{TimeSlice, add_to_time_slice};
|
||||
use super::{
|
||||
AffiliateCodeAnalytics, AffiliateCodeMetrics, AnalyticsData, Metrics,
|
||||
};
|
||||
|
||||
/// Fields for [`super::ReturnMetrics::affiliate_code_conversions`].
|
||||
#[derive(
|
||||
Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema,
|
||||
)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum AffiliateCodeConversionsField {
|
||||
/// Affiliate code ID.
|
||||
AffiliateCodeId,
|
||||
}
|
||||
|
||||
/// Filters for [`super::ReturnMetrics::affiliate_code_conversions`].
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct AffiliateCodeConversionsFilters {
|
||||
/// Affiliate code IDs to include.
|
||||
#[serde(default)]
|
||||
pub affiliate_code_id: Vec<AffiliateCodeId>,
|
||||
}
|
||||
|
||||
/// [`super::ReturnMetrics::affiliate_code_conversions`].
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct AffiliateCodeConversions {
|
||||
/// Total conversions for this bucket.
|
||||
pub conversions: u64,
|
||||
}
|
||||
|
||||
pub(crate) async fn fetch(
|
||||
pool: &PgPool,
|
||||
time_slices: &mut [TimeSlice],
|
||||
req: &super::super::GetRequest,
|
||||
user_id: DBUserId,
|
||||
num_time_slices: usize,
|
||||
metrics: &Metrics<
|
||||
AffiliateCodeConversionsField,
|
||||
AffiliateCodeConversionsFilters,
|
||||
>,
|
||||
) -> Result<(), ApiError> {
|
||||
let filter_affiliate_code_ids = metrics
|
||||
.filter_by
|
||||
.affiliate_code_id
|
||||
.iter()
|
||||
.map(|id| DBAffiliateCodeId::from(*id).0)
|
||||
.collect::<Vec<_>>();
|
||||
let mut rows = sqlx::query(
|
||||
"SELECT
|
||||
WIDTH_BUCKET(
|
||||
EXTRACT(EPOCH FROM usa.created_at)::bigint,
|
||||
EXTRACT(EPOCH FROM $1::timestamp with time zone AT TIME ZONE 'UTC')::bigint,
|
||||
EXTRACT(EPOCH FROM $2::timestamp with time zone AT TIME ZONE 'UTC')::bigint,
|
||||
$3::integer
|
||||
) AS bucket,
|
||||
CASE WHEN $5 THEN affiliate_code ELSE 0 END AS affiliate_code,
|
||||
COUNT(*) AS conversions
|
||||
FROM users_subscriptions_affiliations usa
|
||||
INNER JOIN affiliate_codes ac ON ac.id = usa.affiliate_code
|
||||
INNER JOIN users_subscriptions us ON us.id = usa.subscription_id
|
||||
INNER JOIN charges c ON c.subscription_id = us.id
|
||||
WHERE
|
||||
ac.affiliate = $4
|
||||
AND usa.created_at BETWEEN $1 AND $2
|
||||
AND c.status = 'succeeded'
|
||||
AND (cardinality($6::bigint[]) = 0 OR affiliate_code = ANY($6))
|
||||
GROUP BY bucket, affiliate_code",
|
||||
)
|
||||
.bind(req.time_range.start)
|
||||
.bind(req.time_range.end)
|
||||
.bind(num_time_slices as i64)
|
||||
.bind(user_id as DBUserId)
|
||||
.bind(
|
||||
metrics
|
||||
.bucket_by
|
||||
.contains(&AffiliateCodeConversionsField::AffiliateCodeId),
|
||||
)
|
||||
.bind(&filter_affiliate_code_ids)
|
||||
.fetch(pool);
|
||||
while let Some(row) = rows.next().await.transpose()? {
|
||||
let bucket = row
|
||||
.try_get::<Option<i32>, _>("bucket")?
|
||||
.wrap_internal_err("bucket should be non-null - query bug!")?;
|
||||
let bucket = usize::try_from(bucket).wrap_internal_err_with(|| {
|
||||
eyre::eyre!(
|
||||
"bucket value {bucket} does not fit into `usize` - query bug!"
|
||||
)
|
||||
})?;
|
||||
|
||||
let affiliate_code = row.try_get::<Option<i64>, _>("affiliate_code")?;
|
||||
let conversion_count = row.try_get::<Option<i64>, _>("conversions")?;
|
||||
let source_affiliate_code = AffiliateCodeId::from(DBAffiliateCodeId(
|
||||
affiliate_code.unwrap_or_default(),
|
||||
));
|
||||
let conversions = u64::try_from(conversion_count.unwrap_or_default())
|
||||
.unwrap_or(u64::MAX);
|
||||
|
||||
add_to_time_slice(
|
||||
time_slices,
|
||||
bucket,
|
||||
AnalyticsData::AffiliateCode(AffiliateCodeAnalytics {
|
||||
source_affiliate_code,
|
||||
metrics: AffiliateCodeMetrics::Conversions(
|
||||
AffiliateCodeConversions { conversions },
|
||||
),
|
||||
}),
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,121 @@
|
||||
use futures::StreamExt;
|
||||
use rust_decimal::Decimal;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::Row;
|
||||
|
||||
use crate::{
|
||||
database::{
|
||||
PgPool,
|
||||
models::{DBAffiliateCodeId, DBUserId},
|
||||
},
|
||||
models::ids::AffiliateCodeId,
|
||||
routes::ApiError,
|
||||
util::error::Context,
|
||||
};
|
||||
|
||||
use super::super::{TimeSlice, add_to_time_slice};
|
||||
use super::{
|
||||
AffiliateCodeAnalytics, AffiliateCodeMetrics, AnalyticsData, Metrics,
|
||||
};
|
||||
|
||||
/// Fields for [`super::ReturnMetrics::affiliate_code_revenue`].
|
||||
#[derive(
|
||||
Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema,
|
||||
)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum AffiliateCodeRevenueField {
|
||||
/// Affiliate code ID.
|
||||
AffiliateCodeId,
|
||||
}
|
||||
|
||||
/// Filters for [`super::ReturnMetrics::affiliate_code_revenue`].
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct AffiliateCodeRevenueFilters {
|
||||
/// Affiliate code IDs to include.
|
||||
#[serde(default)]
|
||||
pub affiliate_code_id: Vec<AffiliateCodeId>,
|
||||
}
|
||||
|
||||
/// [`super::ReturnMetrics::affiliate_code_revenue`].
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct AffiliateCodeRevenue {
|
||||
/// Total revenue for this bucket.
|
||||
pub revenue: Decimal,
|
||||
}
|
||||
|
||||
pub(crate) async fn fetch(
|
||||
pool: &PgPool,
|
||||
time_slices: &mut [TimeSlice],
|
||||
req: &super::super::GetRequest,
|
||||
user_id: DBUserId,
|
||||
num_time_slices: usize,
|
||||
metrics: &Metrics<AffiliateCodeRevenueField, AffiliateCodeRevenueFilters>,
|
||||
) -> Result<(), ApiError> {
|
||||
let filter_affiliate_code_ids = metrics
|
||||
.filter_by
|
||||
.affiliate_code_id
|
||||
.iter()
|
||||
.map(|id| DBAffiliateCodeId::from(*id).0)
|
||||
.collect::<Vec<_>>();
|
||||
let mut rows = sqlx::query(
|
||||
"SELECT
|
||||
WIDTH_BUCKET(
|
||||
EXTRACT(EPOCH FROM created)::bigint,
|
||||
EXTRACT(EPOCH FROM $1::timestamp with time zone AT TIME ZONE 'UTC')::bigint,
|
||||
EXTRACT(EPOCH FROM $2::timestamp with time zone AT TIME ZONE 'UTC')::bigint,
|
||||
$3::integer
|
||||
) AS bucket,
|
||||
CASE WHEN $5 THEN affiliate_code_source ELSE 0 END AS affiliate_code_source,
|
||||
SUM(amount) amount_sum
|
||||
FROM payouts_values
|
||||
WHERE
|
||||
user_id = $4
|
||||
AND payouts_values.affiliate_code_source IS NOT NULL
|
||||
AND created BETWEEN $1 AND $2
|
||||
AND (cardinality($6::bigint[]) = 0 OR affiliate_code_source = ANY($6))
|
||||
GROUP BY bucket, affiliate_code_source",
|
||||
)
|
||||
.bind(req.time_range.start)
|
||||
.bind(req.time_range.end)
|
||||
.bind(num_time_slices as i64)
|
||||
.bind(user_id as DBUserId)
|
||||
.bind(
|
||||
metrics
|
||||
.bucket_by
|
||||
.contains(&AffiliateCodeRevenueField::AffiliateCodeId),
|
||||
)
|
||||
.bind(&filter_affiliate_code_ids)
|
||||
.fetch(pool);
|
||||
while let Some(row) = rows.next().await.transpose()? {
|
||||
let bucket = row
|
||||
.try_get::<Option<i32>, _>("bucket")?
|
||||
.wrap_internal_err("bucket should be non-null - query bug!")?;
|
||||
let bucket = usize::try_from(bucket).wrap_internal_err_with(|| {
|
||||
eyre::eyre!(
|
||||
"bucket value {bucket} does not fit into `usize` - query bug!"
|
||||
)
|
||||
})?;
|
||||
|
||||
let affiliate_code_source =
|
||||
row.try_get::<Option<i64>, _>("affiliate_code_source")?;
|
||||
let source_affiliate_code = AffiliateCodeId::from(DBAffiliateCodeId(
|
||||
affiliate_code_source.unwrap_or_default(),
|
||||
));
|
||||
let revenue = row
|
||||
.try_get::<Option<Decimal>, _>("amount_sum")?
|
||||
.unwrap_or_default();
|
||||
|
||||
add_to_time_slice(
|
||||
time_slices,
|
||||
bucket,
|
||||
AnalyticsData::AffiliateCode(AffiliateCodeAnalytics {
|
||||
source_affiliate_code,
|
||||
metrics: AffiliateCodeMetrics::Revenue(AffiliateCodeRevenue {
|
||||
revenue,
|
||||
}),
|
||||
}),
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,152 @@
|
||||
mod affiliate_code_clicks;
|
||||
mod affiliate_code_conversions;
|
||||
mod affiliate_code_revenue;
|
||||
mod project_downloads;
|
||||
mod project_playtime;
|
||||
mod project_revenue;
|
||||
mod project_views;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::models::ids::{AffiliateCodeId, ProjectId};
|
||||
|
||||
pub(crate) use affiliate_code_clicks::fetch as fetch_affiliate_code_clicks;
|
||||
pub use affiliate_code_clicks::{
|
||||
AffiliateCodeClicks, AffiliateCodeClicksField, AffiliateCodeClicksFilters,
|
||||
};
|
||||
pub(crate) use affiliate_code_conversions::fetch as fetch_affiliate_code_conversions;
|
||||
pub use affiliate_code_conversions::{
|
||||
AffiliateCodeConversions, AffiliateCodeConversionsField,
|
||||
AffiliateCodeConversionsFilters,
|
||||
};
|
||||
pub(crate) use affiliate_code_revenue::fetch as fetch_affiliate_code_revenue;
|
||||
pub use affiliate_code_revenue::{
|
||||
AffiliateCodeRevenue, AffiliateCodeRevenueField,
|
||||
AffiliateCodeRevenueFilters,
|
||||
};
|
||||
pub use project_downloads::{
|
||||
DownloadSource, ProjectDownloads, ProjectDownloadsField,
|
||||
ProjectDownloadsFilters,
|
||||
};
|
||||
pub(crate) use project_downloads::{
|
||||
fetch as fetch_project_downloads, normalize_download_source,
|
||||
};
|
||||
pub(crate) use project_playtime::fetch as fetch_project_playtime;
|
||||
pub use project_playtime::{
|
||||
ProjectPlaytime, ProjectPlaytimeField, ProjectPlaytimeFilters,
|
||||
};
|
||||
pub(crate) use project_revenue::fetch as fetch_project_revenue;
|
||||
pub use project_revenue::{
|
||||
ProjectRevenue, ProjectRevenueField, ProjectRevenueFilters,
|
||||
};
|
||||
pub(crate) use project_views::fetch as fetch_project_views;
|
||||
pub use project_views::{ProjectViews, ProjectViewsField, ProjectViewsFilters};
|
||||
|
||||
/// What metrics the caller would like to receive from this analytics get
|
||||
/// request.
|
||||
#[derive(Debug, Default, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct ReturnMetrics {
|
||||
/// How many times a project page has been viewed.
|
||||
pub project_views: Option<Metrics<ProjectViewsField, ProjectViewsFilters>>,
|
||||
/// How many times a project has been downloaded.
|
||||
pub project_downloads:
|
||||
Option<Metrics<ProjectDownloadsField, ProjectDownloadsFilters>>,
|
||||
/// How long users have been playing a project.
|
||||
pub project_playtime:
|
||||
Option<Metrics<ProjectPlaytimeField, ProjectPlaytimeFilters>>,
|
||||
/// How much payout revenue a project has generated.
|
||||
pub project_revenue:
|
||||
Option<Metrics<ProjectRevenueField, ProjectRevenueFilters>>,
|
||||
/// How many times an affiliate code has been clicked.
|
||||
pub affiliate_code_clicks:
|
||||
Option<Metrics<AffiliateCodeClicksField, AffiliateCodeClicksFilters>>,
|
||||
/// How many times a product has been purchased with an affiliate code.
|
||||
pub affiliate_code_conversions: Option<
|
||||
Metrics<AffiliateCodeConversionsField, AffiliateCodeConversionsFilters>,
|
||||
>,
|
||||
/// How much payout revenue an affiliate code has generated.
|
||||
pub affiliate_code_revenue:
|
||||
Option<Metrics<AffiliateCodeRevenueField, AffiliateCodeRevenueFilters>>,
|
||||
}
|
||||
|
||||
/// See [`ReturnMetrics`].
|
||||
#[derive(Debug, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct Metrics<BucketBy, FilterBy> {
|
||||
/// When collecting metrics, what fields do we want to group the results by?
|
||||
///
|
||||
/// For example, if we have two views entries:
|
||||
/// - `{ "project_id": "abcdefgh", "domain": "youtube.com", "count": 5 }`
|
||||
/// - `{ "project_id": "abcdefgh", "domain": "discord.com", "count": 3 }`
|
||||
///
|
||||
/// If we bucket by `domain`, then we will get two results:
|
||||
/// - `{ "project_id": "abcdefgh", "domain": "youtube.com", "count": 5 }`
|
||||
/// - `{ "project_id": "abcdefgh", "domain": "discord.com", "count": 3 }`
|
||||
///
|
||||
/// If we do not bucket by `domain`, we will only get one, which is an
|
||||
/// aggregate of the two rows:
|
||||
/// - `{ "project_id": "abcdefgh", "count": 8 }`
|
||||
#[serde(default = "Vec::default")]
|
||||
pub bucket_by: Vec<BucketBy>,
|
||||
/// Filters to apply before aggregating this metric.
|
||||
///
|
||||
/// Values within one field are ORed together. Different fields are AND'ed
|
||||
/// together. An empty list means that field is not filtered.
|
||||
#[serde(default)]
|
||||
pub filter_by: FilterBy,
|
||||
}
|
||||
|
||||
/// Metrics collected in a single time slice.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
#[serde(untagged)] // the presence of `source_project`, `source_affiliate_code` determines the kind
|
||||
pub enum AnalyticsData {
|
||||
/// Project metrics.
|
||||
Project(ProjectAnalytics),
|
||||
AffiliateCode(AffiliateCodeAnalytics),
|
||||
}
|
||||
|
||||
/// Project metrics.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct ProjectAnalytics {
|
||||
/// What project these metrics are for.
|
||||
pub source_project: ProjectId,
|
||||
/// Metrics collected.
|
||||
#[serde(flatten)]
|
||||
pub metrics: ProjectMetrics,
|
||||
}
|
||||
|
||||
/// Project metrics of a specific kind.
|
||||
///
|
||||
/// If a field is not included in [`Metrics::bucket_by`], it will be [`None`].
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
#[serde(rename_all = "snake_case", tag = "metric_kind")]
|
||||
pub enum ProjectMetrics {
|
||||
/// [`ReturnMetrics::project_views`].
|
||||
Views(ProjectViews),
|
||||
/// [`ReturnMetrics::project_downloads`].
|
||||
Downloads(ProjectDownloads),
|
||||
/// [`ReturnMetrics::project_playtime`].
|
||||
Playtime(ProjectPlaytime),
|
||||
/// [`ReturnMetrics::project_revenue`].
|
||||
Revenue(ProjectRevenue),
|
||||
}
|
||||
|
||||
/// Affiliate code metrics.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct AffiliateCodeAnalytics {
|
||||
/// What affiliate code these metrics are for.
|
||||
pub source_affiliate_code: AffiliateCodeId,
|
||||
/// Metrics collected.
|
||||
#[serde(flatten)]
|
||||
pub metrics: AffiliateCodeMetrics,
|
||||
}
|
||||
|
||||
/// Affiliate code metrics of a specific kind.
|
||||
///
|
||||
/// If a field is not included in [`Metrics::bucket_by`], it will be [`None`].
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
#[serde(rename_all = "snake_case", tag = "metric_kind")]
|
||||
pub enum AffiliateCodeMetrics {
|
||||
Clicks(AffiliateCodeClicks),
|
||||
Conversions(AffiliateCodeConversions),
|
||||
Revenue(AffiliateCodeRevenue),
|
||||
}
|
||||
@@ -0,0 +1,487 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{
|
||||
LazyLock,
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
},
|
||||
};
|
||||
|
||||
use const_format::formatcp;
|
||||
use dashmap::DashMap;
|
||||
use regex::Regex;
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Error as _};
|
||||
|
||||
use crate::{
|
||||
database::models::{DBProjectId, DBVersionId},
|
||||
models::{ids::VersionId, v3::analytics::DownloadReason},
|
||||
routes::ApiError,
|
||||
};
|
||||
|
||||
use super::super::{
|
||||
ClickhouseFilterParam, QueryClickhouseContext, add_to_time_slice,
|
||||
condense_country, none_if_empty, none_if_zero_version_id,
|
||||
};
|
||||
use super::{AnalyticsData, Metrics, ProjectAnalytics, ProjectMetrics};
|
||||
|
||||
const TIME_RANGE_START: &str = "{time_range_start: UInt64}";
|
||||
const TIME_RANGE_END: &str = "{time_range_end: UInt64}";
|
||||
const TIME_SLICES: &str = "{time_slices: UInt64}";
|
||||
const PROJECT_IDS: &str = "{project_ids: Array(UInt64)}";
|
||||
|
||||
/// Fields for [`super::ReturnMetrics::project_downloads`].
|
||||
#[derive(
|
||||
Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema,
|
||||
)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ProjectDownloadsField {
|
||||
/// Project ID.
|
||||
ProjectId,
|
||||
/// Version ID of this project.
|
||||
VersionId,
|
||||
/// Referrer domain which linked to this project.
|
||||
Domain,
|
||||
/// Normalized user agent used to download this project.
|
||||
UserAgent,
|
||||
/// Whether these downloads were monetized or not.
|
||||
Monetized,
|
||||
/// What country these downloads came from.
|
||||
///
|
||||
/// To anonymize the data, the country may be reported as `XX`.
|
||||
Country,
|
||||
/// Download reason.
|
||||
Reason,
|
||||
/// Game version used for this download.
|
||||
GameVersion,
|
||||
/// Mod loader used for this download.
|
||||
Loader,
|
||||
}
|
||||
|
||||
/// Filters for [`super::ReturnMetrics::project_downloads`].
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct ProjectDownloadsFilters {
|
||||
/// Version IDs to include.
|
||||
#[serde(default)]
|
||||
pub version_id: Vec<VersionId>,
|
||||
/// Referrer domains to include.
|
||||
#[serde(default)]
|
||||
pub domain: Vec<String>,
|
||||
/// Normalized download sources to include.
|
||||
#[serde(default)]
|
||||
pub user_agent: Vec<DownloadSource>,
|
||||
/// Monetization states to include.
|
||||
#[serde(default)]
|
||||
pub monetized: Vec<bool>,
|
||||
/// Country codes to include.
|
||||
#[serde(default)]
|
||||
pub country: Vec<String>,
|
||||
/// Download reasons to include.
|
||||
#[serde(default)]
|
||||
pub reason: Vec<DownloadReason>,
|
||||
/// Game versions to include.
|
||||
#[serde(default)]
|
||||
pub game_version: Vec<String>,
|
||||
/// Loaders to include.
|
||||
#[serde(default)]
|
||||
pub loader: Vec<String>,
|
||||
}
|
||||
|
||||
/// [`super::ReturnMetrics::project_downloads`].
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct ProjectDownloads {
|
||||
/// [`ProjectDownloadsField::Domain`].
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) domain: Option<String>,
|
||||
/// [`ProjectDownloadsField::UserAgent`].
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) user_agent: Option<DownloadSource>,
|
||||
/// [`ProjectDownloadsField::VersionId`].
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) version_id: Option<VersionId>,
|
||||
/// [`ProjectDownloadsField::Monetized`].
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) monetized: Option<bool>,
|
||||
/// [`ProjectDownloadsField::Country`].
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) country: Option<String>,
|
||||
/// [`ProjectDownloadsField::Reason`].
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) reason: Option<DownloadReason>,
|
||||
/// [`ProjectDownloadsField::GameVersion`].
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) game_version: Option<String>,
|
||||
/// [`ProjectDownloadsField::Loader`].
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) loader: Option<String>,
|
||||
/// Total number of downloads for this bucket.
|
||||
pub(crate) downloads: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, utoipa::ToSchema)]
|
||||
pub enum DownloadSource {
|
||||
Website,
|
||||
ModrinthApp,
|
||||
ModrinthHosting,
|
||||
ModrinthMaven,
|
||||
Other,
|
||||
Named(String),
|
||||
}
|
||||
|
||||
impl Serialize for DownloadSource {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
match self {
|
||||
Self::Named(name) => serializer.serialize_str(name),
|
||||
Self::Website => serializer.serialize_str("website"),
|
||||
Self::ModrinthApp => serializer.serialize_str("modrinth_app"),
|
||||
Self::ModrinthHosting => {
|
||||
serializer.serialize_str("modrinth_hosting")
|
||||
}
|
||||
Self::ModrinthMaven => serializer.serialize_str("modrinth_maven"),
|
||||
Self::Other => serializer.serialize_str("other"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for DownloadSource {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
let source = String::deserialize(deserializer)?;
|
||||
Ok(match source.as_str() {
|
||||
"website" => Self::Website,
|
||||
"modrinth_app" => Self::ModrinthApp,
|
||||
"modrinth_hosting" => Self::ModrinthHosting,
|
||||
"modrinth_maven" => Self::ModrinthMaven,
|
||||
"other" => Self::Other,
|
||||
_ if !source.is_empty() => Self::Named(source),
|
||||
_ => {
|
||||
return Err(D::Error::custom(
|
||||
"download source cannot be empty",
|
||||
));
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, clickhouse::Row, serde::Deserialize)]
|
||||
struct DownloadRow {
|
||||
bucket: u64,
|
||||
project_id: DBProjectId,
|
||||
domain: String,
|
||||
user_agent: String,
|
||||
version_id: DBVersionId,
|
||||
monetized: i8,
|
||||
country: String,
|
||||
reason: String,
|
||||
game_version: String,
|
||||
loader: String,
|
||||
downloads: u64,
|
||||
}
|
||||
|
||||
const DOWNLOADS: &str = {
|
||||
const USE_PROJECT_ID: &str = "{use_project_id: Bool}";
|
||||
const USE_DOMAIN: &str = "{use_domain: Bool}";
|
||||
const USE_USER_AGENT: &str = "{use_user_agent: Bool}";
|
||||
const USE_VERSION_ID: &str = "{use_version_id: Bool}";
|
||||
const USE_MONETIZED: &str = "{use_monetized: Bool}";
|
||||
const USE_COUNTRY: &str = "{use_country: Bool}";
|
||||
const USE_REASON: &str = "{use_reason: Bool}";
|
||||
const USE_GAME_VERSION: &str = "{use_game_version: Bool}";
|
||||
const USE_LOADER: &str = "{use_loader: Bool}";
|
||||
const FILTER_DOMAIN: &str = "{filter_domain: Array(String)}";
|
||||
const FILTER_VERSION_ID: &str = "{filter_version_id: Array(UInt64)}";
|
||||
const FILTER_MONETIZED: &str = "{filter_monetized: UInt8}";
|
||||
const FILTER_COUNTRY: &str = "{filter_country: Array(String)}";
|
||||
const FILTER_REASON: &str = "{filter_reason: Array(String)}";
|
||||
const FILTER_GAME_VERSION: &str = "{filter_game_version: Array(String)}";
|
||||
const FILTER_LOADER: &str = "{filter_loader: Array(String)}";
|
||||
|
||||
formatcp!(
|
||||
"SELECT
|
||||
widthBucket(toUnixTimestamp(recorded), {TIME_RANGE_START}, {TIME_RANGE_END}, {TIME_SLICES}) AS bucket,
|
||||
if({USE_PROJECT_ID}, project_id, 0) AS project_id,
|
||||
if({USE_DOMAIN}, domain, '') AS domain,
|
||||
if({USE_USER_AGENT}, user_agent, '') AS user_agent,
|
||||
if({USE_VERSION_ID}, version_id, 0) AS version_id,
|
||||
if({USE_MONETIZED}, CAST(user_id != 0 AS Int8), -1) AS monetized,
|
||||
if({USE_COUNTRY}, country, '') AS country,
|
||||
if({USE_REASON}, reason, '') AS reason,
|
||||
if({USE_GAME_VERSION}, game_version, '') AS game_version,
|
||||
if({USE_LOADER}, loader, '') AS loader,
|
||||
COUNT(*) AS downloads
|
||||
FROM downloads
|
||||
WHERE
|
||||
recorded BETWEEN {TIME_RANGE_START} AND {TIME_RANGE_END}
|
||||
-- make sure that the REAL project id is included,
|
||||
-- not the possibly-zero one,
|
||||
-- by using `downloads.project_id` instead of `project_id`
|
||||
AND downloads.project_id IN {PROJECT_IDS}
|
||||
AND (empty({FILTER_DOMAIN}) OR downloads.domain IN {FILTER_DOMAIN})
|
||||
AND (empty({FILTER_VERSION_ID}) OR downloads.version_id IN {FILTER_VERSION_ID})
|
||||
AND ({FILTER_MONETIZED} = 2 OR CAST(downloads.user_id != 0 AS UInt8) = {FILTER_MONETIZED})
|
||||
AND (empty({FILTER_COUNTRY}) OR downloads.country IN {FILTER_COUNTRY})
|
||||
AND (empty({FILTER_REASON}) OR downloads.reason IN {FILTER_REASON})
|
||||
AND (empty({FILTER_GAME_VERSION}) OR downloads.game_version IN {FILTER_GAME_VERSION})
|
||||
AND (empty({FILTER_LOADER}) OR downloads.loader IN {FILTER_LOADER})
|
||||
GROUP BY bucket, project_id, domain, user_agent, version_id, monetized, country, reason, game_version, loader"
|
||||
)
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
struct DownloadBucket {
|
||||
bucket: u64,
|
||||
project_id: DBProjectId,
|
||||
domain: Option<String>,
|
||||
user_agent: Option<DownloadSource>,
|
||||
version_id: Option<DBVersionId>,
|
||||
monetized: Option<bool>,
|
||||
country: Option<String>,
|
||||
reason: Option<DownloadReason>,
|
||||
game_version: Option<String>,
|
||||
loader: Option<String>,
|
||||
}
|
||||
|
||||
pub(crate) async fn fetch(
|
||||
cx: &mut QueryClickhouseContext<'_>,
|
||||
metrics: &Metrics<ProjectDownloadsField, ProjectDownloadsFilters>,
|
||||
) -> Result<(), ApiError> {
|
||||
use ProjectDownloadsField as F;
|
||||
let uses = |field| metrics.bucket_by.contains(&field);
|
||||
let use_columns = &[
|
||||
("use_project_id", uses(F::ProjectId)),
|
||||
("use_domain", uses(F::Domain)),
|
||||
(
|
||||
"use_user_agent",
|
||||
uses(F::UserAgent) || !metrics.filter_by.user_agent.is_empty(),
|
||||
),
|
||||
("use_version_id", uses(F::VersionId)),
|
||||
("use_monetized", uses(F::Monetized)),
|
||||
("use_country", uses(F::Country)),
|
||||
("use_reason", uses(F::Reason)),
|
||||
("use_game_version", uses(F::GameVersion)),
|
||||
("use_loader", uses(F::Loader)),
|
||||
];
|
||||
|
||||
let mut query = cx
|
||||
.clickhouse
|
||||
.query(DOWNLOADS)
|
||||
.param("time_range_start", cx.req.time_range.start.timestamp())
|
||||
.param("time_range_end", cx.req.time_range.end.timestamp())
|
||||
.param("time_slices", cx.time_slices.len())
|
||||
.param("project_ids", cx.project_ids);
|
||||
for (param_name, used) in use_columns {
|
||||
query = query.param(param_name, used)
|
||||
}
|
||||
for filter_param in [
|
||||
ClickhouseFilterParam::String(
|
||||
"filter_domain",
|
||||
&metrics.filter_by.domain,
|
||||
),
|
||||
ClickhouseFilterParam::VersionId(
|
||||
"filter_version_id",
|
||||
&metrics.filter_by.version_id,
|
||||
),
|
||||
ClickhouseFilterParam::Bool(
|
||||
"filter_monetized",
|
||||
&metrics.filter_by.monetized,
|
||||
),
|
||||
ClickhouseFilterParam::String(
|
||||
"filter_country",
|
||||
&metrics.filter_by.country,
|
||||
),
|
||||
ClickhouseFilterParam::DownloadReason(
|
||||
"filter_reason",
|
||||
&metrics.filter_by.reason,
|
||||
),
|
||||
ClickhouseFilterParam::String(
|
||||
"filter_game_version",
|
||||
&metrics.filter_by.game_version,
|
||||
),
|
||||
ClickhouseFilterParam::String(
|
||||
"filter_loader",
|
||||
&metrics.filter_by.loader,
|
||||
),
|
||||
] {
|
||||
query = filter_param.bind(query);
|
||||
}
|
||||
|
||||
let uses_column = |name| {
|
||||
use_columns
|
||||
.iter()
|
||||
.any(|(column_name, used)| *column_name == name && *used)
|
||||
};
|
||||
let mut cursor = query.fetch::<DownloadRow>()?;
|
||||
let mut buckets = HashMap::<DownloadBucket, u64>::new();
|
||||
|
||||
while let Some(row) = cursor.next().await? {
|
||||
let normalized_source = normalize_download_source(&row.user_agent);
|
||||
if !metrics.filter_by.user_agent.is_empty()
|
||||
&& !normalized_source.as_ref().is_some_and(|source| {
|
||||
metrics.filter_by.user_agent.contains(source)
|
||||
})
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
let key = DownloadBucket {
|
||||
bucket: row.bucket,
|
||||
project_id: row.project_id,
|
||||
domain: uses_column("use_domain").then(|| row.domain.clone()),
|
||||
user_agent: uses(F::UserAgent)
|
||||
.then_some(normalized_source)
|
||||
.flatten(),
|
||||
version_id: uses_column("use_version_id").then_some(row.version_id),
|
||||
monetized: if uses_column("use_monetized") {
|
||||
match row.monetized {
|
||||
0 => Some(false),
|
||||
1 => Some(true),
|
||||
_ => None,
|
||||
}
|
||||
} else {
|
||||
None
|
||||
},
|
||||
country: uses_column("use_country").then(|| row.country.clone()),
|
||||
reason: if uses_column("use_reason") {
|
||||
none_if_empty(row.reason.clone()).and_then(|s| s.parse().ok())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
game_version: uses_column("use_game_version")
|
||||
.then(|| row.game_version.clone()),
|
||||
loader: uses_column("use_loader").then(|| row.loader.clone()),
|
||||
};
|
||||
|
||||
*buckets.entry(key).or_default() += row.downloads;
|
||||
}
|
||||
|
||||
for (key, downloads) in buckets {
|
||||
add_to_time_slice(
|
||||
cx.time_slices,
|
||||
key.bucket as usize,
|
||||
AnalyticsData::Project(ProjectAnalytics {
|
||||
source_project: key.project_id.into(),
|
||||
metrics: ProjectMetrics::Downloads(ProjectDownloads {
|
||||
domain: key.domain.and_then(none_if_empty),
|
||||
user_agent: key.user_agent,
|
||||
version_id: key
|
||||
.version_id
|
||||
.and_then(none_if_zero_version_id),
|
||||
monetized: key.monetized,
|
||||
country: key
|
||||
.country
|
||||
.map(|country| condense_country(country, downloads)),
|
||||
reason: key.reason,
|
||||
game_version: key.game_version.and_then(none_if_empty),
|
||||
loader: key.loader.and_then(none_if_empty),
|
||||
downloads,
|
||||
}),
|
||||
}),
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum DownloadSourcePattern {
|
||||
Named(&'static str),
|
||||
Website,
|
||||
ModrinthApp,
|
||||
ModrinthHosting,
|
||||
ModrinthMaven,
|
||||
}
|
||||
|
||||
impl DownloadSourcePattern {
|
||||
fn into_source(self) -> DownloadSource {
|
||||
match self {
|
||||
Self::Named(name) => DownloadSource::Named(name.into()),
|
||||
Self::Website => DownloadSource::Website,
|
||||
Self::ModrinthApp => DownloadSource::ModrinthApp,
|
||||
Self::ModrinthHosting => DownloadSource::ModrinthHosting,
|
||||
Self::ModrinthMaven => DownloadSource::ModrinthMaven,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static DOWNLOAD_SOURCE_PATTERNS: LazyLock<Vec<(Regex, DownloadSourcePattern)>> =
|
||||
LazyLock::new(|| {
|
||||
use DownloadSourcePattern as P;
|
||||
|
||||
[
|
||||
(r"^modrinth/kyros/", P::ModrinthHosting),
|
||||
(r"^modrinth/theseus/", P::ModrinthApp),
|
||||
(r"^(Gradle/|Apache-Maven/)", P::ModrinthMaven),
|
||||
(r"^MultiMC/", P::Named("MultiMC")),
|
||||
(r"^PrismLauncher/", P::Named("Prism Launcher")),
|
||||
(r"^PolyMC/", P::Named("PolyMC")),
|
||||
(r"^FCL/", P::Named("FCL")),
|
||||
(r"^PCL2/", P::Named("PCL2")),
|
||||
(r"^HMCL/", P::Named("HMCL")),
|
||||
(r"^Lunar Client Launcher", P::Named("Lunar Client")),
|
||||
(r"^PojavLauncher", P::Named("PojavLauncher")),
|
||||
(r"^ATLauncher/", P::Named("ATLauncher")),
|
||||
(r"FeatherLauncher/", P::Named("Feather Client")),
|
||||
(
|
||||
r"^FeatherMC/Feather Client Rust Launcher/",
|
||||
P::Named("Feather Client"),
|
||||
),
|
||||
(r"Feather/[0-9A-Za-z]+", P::Named("Feather Client")),
|
||||
(r"^PandoraLauncher/", P::Named("Pandora Launcher")),
|
||||
(r"^unsup", P::Named("unsup")),
|
||||
(r"nothub/mrpack-install", P::Named("mrpack-install")),
|
||||
(r"^(packwiz-installer|packwiz/)", P::Named("Packwiz")),
|
||||
(
|
||||
r"^(Mozilla/|Chrome/|Chromium/|Firefox/|Safari/|AppleWebKit/|Edg/|OPR/)",
|
||||
P::Website,
|
||||
),
|
||||
]
|
||||
.into_iter()
|
||||
.map(|(pattern, source)| {
|
||||
(
|
||||
Regex::new(pattern)
|
||||
.expect("download source regex should be valid"),
|
||||
source,
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
});
|
||||
|
||||
const MAX_DOWNLOAD_SOURCE_CACHE_BYTES: usize = 100 * 1024 * 1024;
|
||||
|
||||
static DOWNLOAD_SOURCE_CACHE: LazyLock<
|
||||
DashMap<String, Option<DownloadSource>>,
|
||||
> = LazyLock::new(DashMap::new);
|
||||
|
||||
static DOWNLOAD_SOURCE_CACHE_BYTES: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
pub(crate) fn normalize_download_source(
|
||||
user_agent: &str,
|
||||
) -> Option<DownloadSource> {
|
||||
if let Some(source) = DOWNLOAD_SOURCE_CACHE.get(user_agent) {
|
||||
return source.clone();
|
||||
}
|
||||
|
||||
let source = normalize_download_source_uncached(user_agent);
|
||||
|
||||
let key_bytes = user_agent.len();
|
||||
let previous_bytes =
|
||||
DOWNLOAD_SOURCE_CACHE_BYTES.fetch_add(key_bytes, Ordering::Relaxed);
|
||||
if previous_bytes + key_bytes <= MAX_DOWNLOAD_SOURCE_CACHE_BYTES {
|
||||
DOWNLOAD_SOURCE_CACHE.insert(user_agent.to_owned(), source.clone());
|
||||
} else {
|
||||
DOWNLOAD_SOURCE_CACHE_BYTES.fetch_sub(key_bytes, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
source
|
||||
}
|
||||
|
||||
fn normalize_download_source_uncached(
|
||||
user_agent: &str,
|
||||
) -> Option<DownloadSource> {
|
||||
DOWNLOAD_SOURCE_PATTERNS.iter().find_map(|(regex, source)| {
|
||||
regex.is_match(user_agent).then(|| source.into_source())
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,266 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use const_format::formatcp;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
database::models::{DBProjectId, DBVersionId},
|
||||
models::ids::VersionId,
|
||||
routes::ApiError,
|
||||
};
|
||||
|
||||
use super::super::{
|
||||
ClickhouseFilterParam, QueryClickhouseContext, add_to_time_slice,
|
||||
condense_country, none_if_empty, none_if_zero_version_id,
|
||||
};
|
||||
use super::{AnalyticsData, Metrics, ProjectAnalytics, ProjectMetrics};
|
||||
|
||||
const TIME_RANGE_START: &str = "{time_range_start: UInt64}";
|
||||
const TIME_RANGE_END: &str = "{time_range_end: UInt64}";
|
||||
const TIME_SLICES: &str = "{time_slices: UInt64}";
|
||||
const PROJECT_IDS: &str = "{project_ids: Array(UInt64)}";
|
||||
|
||||
/// Fields for [`super::ReturnMetrics::project_playtime`].
|
||||
#[derive(
|
||||
Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema,
|
||||
)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ProjectPlaytimeField {
|
||||
/// Project ID.
|
||||
ProjectId,
|
||||
/// Version ID of this project.
|
||||
VersionId,
|
||||
/// Game mod loader which was used to count this playtime, e.g. Fabric.
|
||||
Loader,
|
||||
/// Game version which this project was played on.
|
||||
GameVersion,
|
||||
/// What country this playtime came from.
|
||||
///
|
||||
/// To anonymize the data, the country may be reported as `XX`.
|
||||
Country,
|
||||
}
|
||||
|
||||
/// Filters for [`super::ReturnMetrics::project_playtime`].
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct ProjectPlaytimeFilters {
|
||||
/// Version IDs to include.
|
||||
#[serde(default)]
|
||||
pub version_id: Vec<VersionId>,
|
||||
/// Loaders to include.
|
||||
#[serde(default)]
|
||||
pub loader: Vec<String>,
|
||||
/// Game versions to include.
|
||||
#[serde(default)]
|
||||
pub game_version: Vec<String>,
|
||||
/// Country codes to include.
|
||||
#[serde(default)]
|
||||
pub country: Vec<String>,
|
||||
}
|
||||
|
||||
/// [`super::ReturnMetrics::project_playtime`].
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct ProjectPlaytime {
|
||||
/// [`ProjectPlaytimeField::VersionId`].
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) version_id: Option<VersionId>,
|
||||
/// [`ProjectPlaytimeField::Loader`].
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) loader: Option<String>,
|
||||
/// [`ProjectPlaytimeField::GameVersion`].
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) game_version: Option<String>,
|
||||
/// [`ProjectPlaytimeField::Country`].
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) country: Option<String>,
|
||||
/// Total number of seconds of playtime for this bucket.
|
||||
pub(crate) seconds: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, clickhouse::Row, serde::Deserialize)]
|
||||
struct PlaytimeRow {
|
||||
bucket: u64,
|
||||
project_id: DBProjectId,
|
||||
parent_version_id: DBVersionId,
|
||||
version_id: DBVersionId,
|
||||
loader: String,
|
||||
game_version: String,
|
||||
country: String,
|
||||
seconds: u64,
|
||||
}
|
||||
|
||||
const PLAYTIME: &str = {
|
||||
const USE_PROJECT_ID: &str = "{use_project_id: Bool}";
|
||||
const USE_VERSION_ID: &str = "{use_version_id: Bool}";
|
||||
const USE_LOADER: &str = "{use_loader: Bool}";
|
||||
const USE_GAME_VERSION: &str = "{use_game_version: Bool}";
|
||||
const USE_COUNTRY: &str = "{use_country: Bool}";
|
||||
const PARENT_VERSION_IDS: &str = "{parent_version_ids: Array(UInt64)}";
|
||||
const FILTER_VERSION_ID: &str = "{filter_version_id: Array(UInt64)}";
|
||||
const FILTER_LOADER: &str = "{filter_loader: Array(String)}";
|
||||
const FILTER_GAME_VERSION: &str = "{filter_game_version: Array(String)}";
|
||||
const FILTER_COUNTRY: &str = "{filter_country: Array(String)}";
|
||||
|
||||
formatcp!(
|
||||
"SELECT
|
||||
bucket,
|
||||
if({USE_PROJECT_ID}, source_project_id, 0) AS project_id,
|
||||
parent_version_id,
|
||||
version_id,
|
||||
loader,
|
||||
game_version,
|
||||
country,
|
||||
SUM(seconds) AS seconds
|
||||
FROM (
|
||||
SELECT
|
||||
widthBucket(toUnixTimestamp(recorded), {TIME_RANGE_START}, {TIME_RANGE_END}, {TIME_SLICES}) AS bucket,
|
||||
project_id AS source_project_id,
|
||||
0 AS parent_version_id,
|
||||
if({USE_VERSION_ID}, version_id, 0) AS version_id,
|
||||
if({USE_LOADER}, loader, '') AS loader,
|
||||
if({USE_GAME_VERSION}, game_version, '') AS game_version,
|
||||
if({USE_COUNTRY}, country, '') AS country,
|
||||
seconds
|
||||
FROM playtime
|
||||
WHERE
|
||||
recorded BETWEEN {TIME_RANGE_START} AND {TIME_RANGE_END}
|
||||
AND playtime.project_id IN {PROJECT_IDS}
|
||||
AND (empty({FILTER_VERSION_ID}) OR playtime.version_id IN {FILTER_VERSION_ID})
|
||||
AND (empty({FILTER_LOADER}) OR playtime.loader IN {FILTER_LOADER})
|
||||
AND (empty({FILTER_GAME_VERSION}) OR playtime.game_version IN {FILTER_GAME_VERSION})
|
||||
AND (empty({FILTER_COUNTRY}) OR playtime.country IN {FILTER_COUNTRY})
|
||||
|
||||
UNION ALL
|
||||
|
||||
SELECT
|
||||
widthBucket(toUnixTimestamp(recorded), {TIME_RANGE_START}, {TIME_RANGE_END}, {TIME_SLICES}) AS bucket,
|
||||
0 AS source_project_id,
|
||||
parent AS parent_version_id,
|
||||
if({USE_VERSION_ID}, version_id, 0) AS version_id,
|
||||
if({USE_LOADER}, loader, '') AS loader,
|
||||
if({USE_GAME_VERSION}, game_version, '') AS game_version,
|
||||
if({USE_COUNTRY}, country, '') AS country,
|
||||
seconds
|
||||
FROM playtime
|
||||
WHERE
|
||||
recorded BETWEEN {TIME_RANGE_START} AND {TIME_RANGE_END}
|
||||
AND parent IN {PARENT_VERSION_IDS}
|
||||
AND (empty({FILTER_VERSION_ID}) OR playtime.version_id IN {FILTER_VERSION_ID})
|
||||
AND (empty({FILTER_LOADER}) OR playtime.loader IN {FILTER_LOADER})
|
||||
AND (empty({FILTER_GAME_VERSION}) OR playtime.game_version IN {FILTER_GAME_VERSION})
|
||||
AND (empty({FILTER_COUNTRY}) OR playtime.country IN {FILTER_COUNTRY})
|
||||
)
|
||||
GROUP BY bucket, project_id, parent_version_id, version_id, loader, game_version, country"
|
||||
)
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
struct PlaytimeBucket {
|
||||
bucket: u64,
|
||||
project_id: DBProjectId,
|
||||
version_id: Option<DBVersionId>,
|
||||
loader: Option<String>,
|
||||
game_version: Option<String>,
|
||||
country: Option<String>,
|
||||
}
|
||||
|
||||
pub(crate) async fn fetch(
|
||||
cx: &mut QueryClickhouseContext<'_>,
|
||||
parent_version_projects: &HashMap<DBVersionId, DBProjectId>,
|
||||
metrics: &Metrics<ProjectPlaytimeField, ProjectPlaytimeFilters>,
|
||||
) -> Result<(), ApiError> {
|
||||
use ProjectPlaytimeField as F;
|
||||
let uses = |field| metrics.bucket_by.contains(&field);
|
||||
let use_columns = &[
|
||||
("use_project_id", uses(F::ProjectId)),
|
||||
("use_version_id", uses(F::VersionId)),
|
||||
("use_loader", uses(F::Loader)),
|
||||
("use_game_version", uses(F::GameVersion)),
|
||||
("use_country", uses(F::Country)),
|
||||
];
|
||||
let uses_column = |name| {
|
||||
use_columns
|
||||
.iter()
|
||||
.any(|(column_name, used)| *column_name == name && *used)
|
||||
};
|
||||
|
||||
let mut query = cx
|
||||
.clickhouse
|
||||
.query(PLAYTIME)
|
||||
.param("time_range_start", cx.req.time_range.start.timestamp())
|
||||
.param("time_range_end", cx.req.time_range.end.timestamp())
|
||||
.param("time_slices", cx.time_slices.len())
|
||||
.param("project_ids", cx.project_ids)
|
||||
.param("parent_version_ids", cx.parent_version_ids);
|
||||
for (param_name, used) in use_columns {
|
||||
query = query.param(param_name, used)
|
||||
}
|
||||
for filter_param in [
|
||||
ClickhouseFilterParam::VersionId(
|
||||
"filter_version_id",
|
||||
&metrics.filter_by.version_id,
|
||||
),
|
||||
ClickhouseFilterParam::String(
|
||||
"filter_loader",
|
||||
&metrics.filter_by.loader,
|
||||
),
|
||||
ClickhouseFilterParam::String(
|
||||
"filter_game_version",
|
||||
&metrics.filter_by.game_version,
|
||||
),
|
||||
ClickhouseFilterParam::String(
|
||||
"filter_country",
|
||||
&metrics.filter_by.country,
|
||||
),
|
||||
] {
|
||||
query = filter_param.bind(query);
|
||||
}
|
||||
|
||||
let mut cursor = query.fetch::<PlaytimeRow>()?;
|
||||
let mut buckets = HashMap::<PlaytimeBucket, u64>::new();
|
||||
|
||||
while let Some(row) = cursor.next().await? {
|
||||
let project_id =
|
||||
if uses_column("use_project_id") && row.project_id.0 == 0 {
|
||||
parent_version_projects
|
||||
.get(&row.parent_version_id)
|
||||
.copied()
|
||||
.unwrap_or(row.project_id)
|
||||
} else {
|
||||
row.project_id
|
||||
};
|
||||
let key = PlaytimeBucket {
|
||||
bucket: row.bucket,
|
||||
project_id,
|
||||
version_id: uses_column("use_version_id").then_some(row.version_id),
|
||||
loader: uses_column("use_loader").then(|| row.loader.clone()),
|
||||
game_version: uses_column("use_game_version")
|
||||
.then(|| row.game_version.clone()),
|
||||
country: uses_column("use_country").then(|| row.country.clone()),
|
||||
};
|
||||
|
||||
*buckets.entry(key).or_default() += row.seconds;
|
||||
}
|
||||
|
||||
for (key, seconds) in buckets {
|
||||
add_to_time_slice(
|
||||
cx.time_slices,
|
||||
key.bucket as usize,
|
||||
AnalyticsData::Project(ProjectAnalytics {
|
||||
source_project: key.project_id.into(),
|
||||
metrics: ProjectMetrics::Playtime(ProjectPlaytime {
|
||||
version_id: key
|
||||
.version_id
|
||||
.and_then(none_if_zero_version_id),
|
||||
loader: key.loader.and_then(none_if_empty),
|
||||
game_version: key.game_version.and_then(none_if_empty),
|
||||
country: key
|
||||
.country
|
||||
.map(|country| condense_country(country, seconds)),
|
||||
seconds,
|
||||
}),
|
||||
}),
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,98 @@
|
||||
use futures::StreamExt;
|
||||
use rust_decimal::Decimal;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::Row;
|
||||
|
||||
use crate::{
|
||||
database::{PgPool, models::DBProjectId},
|
||||
models::ids::ProjectId,
|
||||
routes::ApiError,
|
||||
util::error::Context,
|
||||
};
|
||||
|
||||
use super::super::{TimeSlice, add_to_time_slice};
|
||||
use super::{AnalyticsData, ProjectAnalytics, ProjectMetrics};
|
||||
|
||||
/// Fields for [`super::ReturnMetrics::project_revenue`].
|
||||
#[derive(
|
||||
Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema,
|
||||
)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ProjectRevenueField {
|
||||
/// Project ID.
|
||||
ProjectId,
|
||||
}
|
||||
|
||||
/// Filters for [`super::ReturnMetrics::project_revenue`].
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct ProjectRevenueFilters {}
|
||||
|
||||
/// [`super::ReturnMetrics::project_revenue`].
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct ProjectRevenue {
|
||||
/// Total revenue for this bucket.
|
||||
pub(crate) revenue: Decimal,
|
||||
}
|
||||
|
||||
pub(crate) async fn fetch(
|
||||
pool: &PgPool,
|
||||
time_slices: &mut [TimeSlice],
|
||||
req: &super::super::GetRequest,
|
||||
num_time_slices: usize,
|
||||
project_id_values: &[i64],
|
||||
) -> Result<(), ApiError> {
|
||||
let mut rows = sqlx::query(
|
||||
"SELECT
|
||||
WIDTH_BUCKET(
|
||||
EXTRACT(EPOCH FROM created)::bigint,
|
||||
EXTRACT(EPOCH FROM $1::timestamp with time zone AT TIME ZONE 'UTC')::bigint,
|
||||
EXTRACT(EPOCH FROM $2::timestamp with time zone AT TIME ZONE 'UTC')::bigint,
|
||||
$3::integer
|
||||
) AS bucket,
|
||||
mod_id,
|
||||
SUM(amount) amount_sum
|
||||
FROM payouts_values
|
||||
WHERE
|
||||
-- only project revenue is counted here
|
||||
-- for affiliate code revenue, see `affiliate_code_revenue`
|
||||
payouts_values.mod_id IS NOT NULL
|
||||
AND payouts_values.mod_id = ANY($4)
|
||||
AND created BETWEEN $1 AND $2
|
||||
GROUP BY bucket, mod_id",
|
||||
)
|
||||
.bind(req.time_range.start)
|
||||
.bind(req.time_range.end)
|
||||
.bind(num_time_slices as i64)
|
||||
.bind(project_id_values)
|
||||
.fetch(pool);
|
||||
while let Some(row) = rows.next().await.transpose()? {
|
||||
let bucket = row
|
||||
.try_get::<Option<i32>, _>("bucket")?
|
||||
.wrap_internal_err("bucket should be non-null - query bug!")?;
|
||||
let bucket = usize::try_from(bucket).wrap_internal_err_with(|| {
|
||||
eyre::eyre!(
|
||||
"bucket value {bucket} does not fit into `usize` - query bug!"
|
||||
)
|
||||
})?;
|
||||
|
||||
let mod_id = row.try_get::<Option<i64>, _>("mod_id")?;
|
||||
let amount_sum = row.try_get::<Option<Decimal>, _>("amount_sum")?;
|
||||
if let Some(source_project) =
|
||||
mod_id.map(DBProjectId).map(ProjectId::from)
|
||||
&& let Some(revenue) = amount_sum
|
||||
{
|
||||
add_to_time_slice(
|
||||
time_slices,
|
||||
bucket,
|
||||
AnalyticsData::Project(ProjectAnalytics {
|
||||
source_project,
|
||||
metrics: ProjectMetrics::Revenue(ProjectRevenue {
|
||||
revenue,
|
||||
}),
|
||||
}),
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,181 @@
|
||||
use const_format::formatcp;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{database::models::DBProjectId, routes::ApiError};
|
||||
|
||||
use super::super::{
|
||||
ClickhouseFilterParam, ClickhouseQueryParams, QueryClickhouseContext,
|
||||
condense_country, none_if_empty, query_clickhouse,
|
||||
};
|
||||
use super::{AnalyticsData, Metrics, ProjectAnalytics, ProjectMetrics};
|
||||
|
||||
const TIME_RANGE_START: &str = "{time_range_start: UInt64}";
|
||||
const TIME_RANGE_END: &str = "{time_range_end: UInt64}";
|
||||
const TIME_SLICES: &str = "{time_slices: UInt64}";
|
||||
const PROJECT_IDS: &str = "{project_ids: Array(UInt64)}";
|
||||
|
||||
/// Fields for [`super::ReturnMetrics::project_views`].
|
||||
#[derive(
|
||||
Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema,
|
||||
)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ProjectViewsField {
|
||||
/// Project ID.
|
||||
ProjectId,
|
||||
/// Referrer domain which linked to this project.
|
||||
Domain,
|
||||
/// Modrinth site path which was visited, e.g. `/mod/foo`.
|
||||
SitePath,
|
||||
/// Whether these views were monetized or not.
|
||||
Monetized,
|
||||
/// What country these views came from.
|
||||
///
|
||||
/// To anonymize the data, the country may be reported as `XX`.
|
||||
Country,
|
||||
}
|
||||
|
||||
/// Filters for [`super::ReturnMetrics::project_views`].
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct ProjectViewsFilters {
|
||||
/// Referrer domains to include.
|
||||
#[serde(default)]
|
||||
pub domain: Vec<String>,
|
||||
/// Modrinth site paths to include.
|
||||
#[serde(default)]
|
||||
pub site_path: Vec<String>,
|
||||
/// Monetization states to include.
|
||||
#[serde(default)]
|
||||
pub monetized: Vec<bool>,
|
||||
/// Country codes to include.
|
||||
#[serde(default)]
|
||||
pub country: Vec<String>,
|
||||
}
|
||||
|
||||
/// [`super::ReturnMetrics::project_views`].
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct ProjectViews {
|
||||
/// [`ProjectViewsField::Domain`].
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub domain: Option<String>,
|
||||
/// [`ProjectViewsField::SitePath`].
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub site_path: Option<String>,
|
||||
/// [`ProjectViewsField::Monetized`].
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub monetized: Option<bool>,
|
||||
/// [`ProjectViewsField::Country`].
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub country: Option<String>,
|
||||
/// Total number of views for this bucket.
|
||||
pub views: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, clickhouse::Row, serde::Deserialize)]
|
||||
struct ViewRow {
|
||||
bucket: u64,
|
||||
project_id: DBProjectId,
|
||||
domain: String,
|
||||
site_path: String,
|
||||
monetized: i8,
|
||||
country: String,
|
||||
views: u64,
|
||||
}
|
||||
|
||||
const VIEWS: &str = {
|
||||
const USE_PROJECT_ID: &str = "{use_project_id: Bool}";
|
||||
const USE_DOMAIN: &str = "{use_domain: Bool}";
|
||||
const USE_SITE_PATH: &str = "{use_site_path: Bool}";
|
||||
const USE_MONETIZED: &str = "{use_monetized: Bool}";
|
||||
const USE_COUNTRY: &str = "{use_country: Bool}";
|
||||
const FILTER_DOMAIN: &str = "{filter_domain: Array(String)}";
|
||||
const FILTER_SITE_PATH: &str = "{filter_site_path: Array(String)}";
|
||||
const FILTER_MONETIZED: &str = "{filter_monetized: UInt8}";
|
||||
const FILTER_COUNTRY: &str = "{filter_country: Array(String)}";
|
||||
|
||||
formatcp!(
|
||||
"SELECT
|
||||
widthBucket(toUnixTimestamp(recorded), {TIME_RANGE_START}, {TIME_RANGE_END}, {TIME_SLICES}) AS bucket,
|
||||
if({USE_PROJECT_ID}, project_id, 0) AS project_id,
|
||||
if({USE_DOMAIN}, domain, '') AS domain,
|
||||
if({USE_SITE_PATH}, site_path, '') AS site_path,
|
||||
if({USE_MONETIZED}, CAST(monetized AS Int8), -1) AS monetized,
|
||||
if({USE_COUNTRY}, country, '') AS country,
|
||||
COUNT(*) AS views
|
||||
FROM views
|
||||
WHERE
|
||||
recorded BETWEEN {TIME_RANGE_START} AND {TIME_RANGE_END}
|
||||
-- make sure that the REAL project id is included,
|
||||
-- not the possibly-zero one,
|
||||
-- by using `views.project_id` instead of `project_id`
|
||||
AND views.project_id IN {PROJECT_IDS}
|
||||
AND (empty({FILTER_DOMAIN}) OR views.domain IN {FILTER_DOMAIN})
|
||||
AND (empty({FILTER_SITE_PATH}) OR views.site_path IN {FILTER_SITE_PATH})
|
||||
AND ({FILTER_MONETIZED} = 2 OR CAST(views.monetized AS UInt8) = {FILTER_MONETIZED})
|
||||
AND (empty({FILTER_COUNTRY}) OR views.country IN {FILTER_COUNTRY})
|
||||
GROUP BY bucket, project_id, domain, site_path, monetized, country
|
||||
"
|
||||
)
|
||||
};
|
||||
|
||||
pub(crate) async fn fetch(
|
||||
cx: &mut QueryClickhouseContext<'_>,
|
||||
metrics: &Metrics<ProjectViewsField, ProjectViewsFilters>,
|
||||
) -> Result<(), ApiError> {
|
||||
use ProjectViewsField as F;
|
||||
let uses = |field| metrics.bucket_by.contains(&field);
|
||||
|
||||
query_clickhouse::<ViewRow>(
|
||||
cx,
|
||||
VIEWS,
|
||||
ClickhouseQueryParams::PROJECT_IDS,
|
||||
&[
|
||||
("use_project_id", uses(F::ProjectId)),
|
||||
("use_domain", uses(F::Domain)),
|
||||
("use_site_path", uses(F::SitePath)),
|
||||
("use_monetized", uses(F::Monetized)),
|
||||
("use_country", uses(F::Country)),
|
||||
],
|
||||
vec![
|
||||
ClickhouseFilterParam::String(
|
||||
"filter_domain",
|
||||
&metrics.filter_by.domain,
|
||||
),
|
||||
ClickhouseFilterParam::String(
|
||||
"filter_site_path",
|
||||
&metrics.filter_by.site_path,
|
||||
),
|
||||
ClickhouseFilterParam::Bool(
|
||||
"filter_monetized",
|
||||
&metrics.filter_by.monetized,
|
||||
),
|
||||
ClickhouseFilterParam::String(
|
||||
"filter_country",
|
||||
&metrics.filter_by.country,
|
||||
),
|
||||
],
|
||||
|_| true,
|
||||
|row| row.bucket,
|
||||
|row| {
|
||||
let country = if uses(F::Country) {
|
||||
Some(condense_country(row.country, row.views))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
AnalyticsData::Project(ProjectAnalytics {
|
||||
source_project: row.project_id.into(),
|
||||
metrics: ProjectMetrics::Views(ProjectViews {
|
||||
domain: none_if_empty(row.domain),
|
||||
site_path: none_if_empty(row.site_path),
|
||||
monetized: match row.monetized {
|
||||
0 => Some(false),
|
||||
1 => Some(true),
|
||||
_ => None,
|
||||
},
|
||||
country,
|
||||
views: row.views,
|
||||
}),
|
||||
})
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -0,0 +1,813 @@
|
||||
//! # Design rationale
|
||||
//!
|
||||
//! - different metrics require different scopes
|
||||
//! - views, downloads, playtime requires `Scopes::ANALYTICS`
|
||||
//! - revenue requires `Scopes::PAYOUTS_READ`
|
||||
//! - each request returns an array of N elements; if you have to make multiple
|
||||
//! requests, you have to zip together M arrays of N elements
|
||||
//! - this makes it inconvenient to have separate endpoints
|
||||
|
||||
mod facets;
|
||||
mod metrics;
|
||||
mod old;
|
||||
|
||||
use std::{collections::HashMap, num::NonZeroU64};
|
||||
|
||||
use crate::database::PgPool;
|
||||
use actix_web::{HttpRequest, post, web};
|
||||
use chrono::{DateTime, TimeDelta, Utc};
|
||||
use eyre::eyre;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
auth::{
|
||||
AuthenticationError, checks::filter_visible_version_ids,
|
||||
get_user_from_headers,
|
||||
},
|
||||
database::{
|
||||
self, DBProject,
|
||||
models::{
|
||||
DBAffiliateCode, DBAffiliateCodeId, DBProjectId, DBUser, DBVersion,
|
||||
DBVersionId,
|
||||
},
|
||||
redis::RedisPool,
|
||||
},
|
||||
models::{
|
||||
ids::{AffiliateCodeId, ProjectId, VersionId},
|
||||
pats::Scopes,
|
||||
projects::ProjectStatus,
|
||||
teams::ProjectPermissions,
|
||||
threads::MessageBody,
|
||||
v3::analytics::DownloadReason,
|
||||
},
|
||||
queue::session::AuthQueue,
|
||||
routes::ApiError,
|
||||
};
|
||||
|
||||
pub(crate) use metrics::normalize_download_source;
|
||||
pub use metrics::*;
|
||||
|
||||
pub fn config(cfg: &mut utoipa_actix_web::service_config::ServiceConfig) {
|
||||
cfg.service(fetch_analytics);
|
||||
cfg.configure(facets::config);
|
||||
cfg.configure(old::config);
|
||||
}
|
||||
|
||||
// request
|
||||
|
||||
/// Requests analytics data, aggregating over all possible analytics sources
|
||||
/// like projects and affiliate codes, returning the data in a list of time
|
||||
/// slices.
|
||||
#[derive(Debug, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct GetRequest {
|
||||
/// What time range to return statistics for.
|
||||
pub time_range: TimeRange,
|
||||
/// What analytics metrics to return data for.
|
||||
#[serde(default)]
|
||||
pub return_metrics: ReturnMetrics,
|
||||
/// What project IDs to return data for.
|
||||
///
|
||||
/// If this is empty, all of the user's projects will be included.
|
||||
#[serde(default)]
|
||||
pub project_ids: Vec<ProjectId>,
|
||||
}
|
||||
|
||||
/// Time range for fetching analytics.
|
||||
#[derive(Debug, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct TimeRange {
|
||||
/// When to start including data.
|
||||
pub start: DateTime<Utc>,
|
||||
/// When to stop including data.
|
||||
pub end: DateTime<Utc>,
|
||||
/// Determines how many time slices between the start and end will be
|
||||
/// included, and how fine-grained those time slices will be.
|
||||
///
|
||||
/// This must fall within the bounds of [`MIN_RESOLUTION`] and
|
||||
/// [`MAX_TIME_SLICES`].
|
||||
pub resolution: TimeRangeResolution,
|
||||
}
|
||||
|
||||
/// Determines how many time slices between the start and end will be
|
||||
/// included, and how fine-grained those time slices will be.
|
||||
#[derive(Debug, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum TimeRangeResolution {
|
||||
/// Use a set number of time slices, with the resolution being determined
|
||||
/// automatically.
|
||||
#[schema(value_type = u64)]
|
||||
Slices(NonZeroU64),
|
||||
/// Each time slice will be a set number of minutes long, and the number of
|
||||
/// slices is determined automatically.
|
||||
#[schema(value_type = u64)]
|
||||
Minutes(NonZeroU64),
|
||||
}
|
||||
|
||||
/// Minimum width of a [`TimeSlice`], controlled by [`TimeRange::resolution`].
|
||||
pub const MIN_RESOLUTION: TimeDelta = TimeDelta::minutes(60);
|
||||
|
||||
/// Maximum number of [`TimeSlice`]s in a [`GetResponse`], controlled by
|
||||
/// [`TimeRange::resolution`].
|
||||
pub const MAX_TIME_SLICES: usize = 1024;
|
||||
|
||||
// response
|
||||
|
||||
/// Response for a [`GetRequest`].
|
||||
#[derive(Debug, Default, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct GetResponse {
|
||||
/// List of N [`TimeSlice`]s, where each slice represents an equal
|
||||
/// time interval of metrics collection. The number of slices is determined
|
||||
/// by [`GetRequest::time_range`].
|
||||
pub metrics: Vec<TimeSlice>,
|
||||
/// List of events associated with projects that were requested.
|
||||
pub project_events: Vec<ProjectAnalyticsEvent>,
|
||||
}
|
||||
|
||||
/// Single time interval of metrics collection.
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct TimeSlice(pub Vec<AnalyticsData>);
|
||||
|
||||
/// Notable update to a project which may reflect in analytics metrics.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct ProjectAnalyticsEvent {
|
||||
/// ID of the event's project.
|
||||
pub project_id: ProjectId,
|
||||
/// When the event occurred.
|
||||
pub timestamp: DateTime<Utc>,
|
||||
#[serde(flatten)]
|
||||
pub kind: ProjectAnalyticsEventKind,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
#[serde(tag = "kind", rename_all = "snake_case")]
|
||||
pub enum ProjectAnalyticsEventKind {
|
||||
/// New version of this project was uploaded.
|
||||
VersionUploaded {
|
||||
version_id: VersionId,
|
||||
version_name: String,
|
||||
version_number: String,
|
||||
},
|
||||
/// Project changed status.
|
||||
StatusChanged {
|
||||
status_from: ProjectStatus,
|
||||
status_to: ProjectStatus,
|
||||
},
|
||||
}
|
||||
|
||||
// logic
|
||||
|
||||
/// Fetches analytics data for the authorized user's projects.
|
||||
#[utoipa::path(
|
||||
responses((status = OK, body = inline(GetResponse))),
|
||||
)]
|
||||
#[post("")]
|
||||
pub async fn fetch_analytics(
|
||||
http_req: HttpRequest,
|
||||
req: web::Json<GetRequest>,
|
||||
pool: web::Data<PgPool>,
|
||||
redis: web::Data<RedisPool>,
|
||||
session_queue: web::Data<AuthQueue>,
|
||||
clickhouse: web::Data<clickhouse::Client>,
|
||||
) -> Result<web::Json<GetResponse>, ApiError> {
|
||||
let (scopes, user) = get_user_from_headers(
|
||||
&http_req,
|
||||
&**pool,
|
||||
&redis,
|
||||
&session_queue,
|
||||
Scopes::ANALYTICS,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let full_time_range = req.time_range.end - req.time_range.start;
|
||||
if full_time_range < TimeDelta::zero() {
|
||||
return Err(ApiError::InvalidInput(
|
||||
"End date must be after start date".into(),
|
||||
));
|
||||
}
|
||||
|
||||
let (num_time_slices, resolution) = match req.time_range.resolution {
|
||||
TimeRangeResolution::Slices(slices) => {
|
||||
let slices = i32::try_from(slices.get()).map_err(|_| {
|
||||
ApiError::InvalidInput(
|
||||
"Number of slices must fit into an `i32`".into(),
|
||||
)
|
||||
})?;
|
||||
let resolution = full_time_range / slices;
|
||||
(slices as usize, resolution)
|
||||
}
|
||||
TimeRangeResolution::Minutes(resolution_minutes) => {
|
||||
let resolution_minutes = i64::try_from(resolution_minutes.get())
|
||||
.map_err(|_| {
|
||||
ApiError::InvalidInput(
|
||||
"Resolution must fit into a `i64`".into(),
|
||||
)
|
||||
})?;
|
||||
let resolution = TimeDelta::try_minutes(resolution_minutes)
|
||||
.ok_or_else(|| {
|
||||
ApiError::InvalidInput("Resolution overflow".into())
|
||||
})?;
|
||||
|
||||
let num_slices =
|
||||
full_time_range.as_seconds_f64() / resolution.as_seconds_f64();
|
||||
|
||||
(num_slices as usize, resolution)
|
||||
}
|
||||
};
|
||||
|
||||
if num_time_slices > MAX_TIME_SLICES {
|
||||
return Err(ApiError::Request(eyre!(
|
||||
"Resolution is too fine or range is too large - maximum of {MAX_TIME_SLICES} time slices, was {num_time_slices}"
|
||||
)));
|
||||
}
|
||||
if resolution < MIN_RESOLUTION {
|
||||
return Err(ApiError::Request(eyre!(
|
||||
"Resolution must be at least {MIN_RESOLUTION}, was {resolution}",
|
||||
)));
|
||||
}
|
||||
|
||||
let mut time_slices = vec![TimeSlice::default(); num_time_slices];
|
||||
|
||||
let project_ids = {
|
||||
if req.project_ids.is_empty() {
|
||||
DBUser::get_projects(user.id.into(), &**pool, &redis).await?
|
||||
} else {
|
||||
req.project_ids
|
||||
.iter()
|
||||
.map(|id| DBProjectId::from(*id))
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
};
|
||||
|
||||
let project_ids =
|
||||
filter_allowed_project_ids(&project_ids, &user, &pool, &redis).await?;
|
||||
|
||||
let project_id_values =
|
||||
project_ids.iter().map(|id| id.0).collect::<Vec<_>>();
|
||||
let parent_versions = sqlx::query!(
|
||||
"
|
||||
SELECT id, mod_id
|
||||
FROM versions
|
||||
WHERE mod_id = ANY($1)
|
||||
",
|
||||
&project_id_values,
|
||||
)
|
||||
.fetch_all(&**pool)
|
||||
.await?;
|
||||
let parent_version_ids = parent_versions
|
||||
.iter()
|
||||
.map(|version| DBVersionId(version.id))
|
||||
.collect::<Vec<_>>();
|
||||
let parent_version_projects = parent_versions
|
||||
.iter()
|
||||
.map(|version| (DBVersionId(version.id), DBProjectId(version.mod_id)))
|
||||
.collect::<HashMap<_, _>>();
|
||||
let parent_version_data =
|
||||
DBVersion::get_many(&parent_version_ids, &**pool, &redis).await?;
|
||||
let visible_version_ids = filter_visible_version_ids(
|
||||
parent_version_data
|
||||
.iter()
|
||||
.map(|version| &version.inner)
|
||||
.collect(),
|
||||
&Some(user.clone()),
|
||||
&pool,
|
||||
&redis,
|
||||
)
|
||||
.await?;
|
||||
let mut project_events = parent_version_data
|
||||
.iter()
|
||||
.filter(|version| {
|
||||
visible_version_ids.contains(&version.inner.id)
|
||||
&& version.inner.date_published >= req.time_range.start
|
||||
&& version.inner.date_published <= req.time_range.end
|
||||
})
|
||||
.map(|version| ProjectAnalyticsEvent {
|
||||
project_id: version.inner.project_id.into(),
|
||||
timestamp: version.inner.date_published,
|
||||
kind: ProjectAnalyticsEventKind::VersionUploaded {
|
||||
version_id: version.inner.id.into(),
|
||||
version_name: version.inner.name.clone(),
|
||||
version_number: version.inner.version_number.clone(),
|
||||
},
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
project_events.extend(
|
||||
fetch_project_status_change_events(
|
||||
&project_ids,
|
||||
&req.time_range,
|
||||
&pool,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
project_events.sort_by_key(|event| event.timestamp);
|
||||
|
||||
let affiliate_code_ids =
|
||||
DBAffiliateCode::get_by_affiliate(user.id.into(), &**pool)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|code| code.id)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut query_clickhouse_cx = QueryClickhouseContext {
|
||||
clickhouse: &clickhouse,
|
||||
req: &req,
|
||||
time_slices: &mut time_slices,
|
||||
project_ids: &project_ids,
|
||||
parent_version_ids: &parent_version_ids,
|
||||
affiliate_code_ids: &affiliate_code_ids,
|
||||
};
|
||||
|
||||
if let Some(metrics) = &req.return_metrics.project_views {
|
||||
metrics::fetch_project_views(&mut query_clickhouse_cx, metrics).await?;
|
||||
}
|
||||
|
||||
if let Some(metrics) = &req.return_metrics.project_downloads {
|
||||
metrics::fetch_project_downloads(&mut query_clickhouse_cx, metrics)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if let Some(metrics) = &req.return_metrics.project_playtime {
|
||||
metrics::fetch_project_playtime(
|
||||
&mut query_clickhouse_cx,
|
||||
&parent_version_projects,
|
||||
metrics,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if let Some(metrics) = &req.return_metrics.affiliate_code_clicks {
|
||||
metrics::fetch_affiliate_code_clicks(&mut query_clickhouse_cx, metrics)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if req.return_metrics.project_revenue.is_some() {
|
||||
if !scopes.contains(Scopes::PAYOUTS_READ) {
|
||||
return Err(AuthenticationError::InvalidCredentials.into());
|
||||
}
|
||||
|
||||
metrics::fetch_project_revenue(
|
||||
&pool,
|
||||
&mut time_slices,
|
||||
&req,
|
||||
num_time_slices,
|
||||
&project_id_values,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if let Some(metrics) = &req.return_metrics.affiliate_code_conversions {
|
||||
metrics::fetch_affiliate_code_conversions(
|
||||
&pool,
|
||||
&mut time_slices,
|
||||
&req,
|
||||
user.id.into(),
|
||||
num_time_slices,
|
||||
metrics,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if let Some(metrics) = &req.return_metrics.affiliate_code_revenue {
|
||||
if !scopes.contains(Scopes::PAYOUTS_READ) {
|
||||
return Err(AuthenticationError::InvalidCredentials.into());
|
||||
}
|
||||
|
||||
metrics::fetch_affiliate_code_revenue(
|
||||
&pool,
|
||||
&mut time_slices,
|
||||
&req,
|
||||
user.id.into(),
|
||||
num_time_slices,
|
||||
metrics,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(web::Json(GetResponse {
|
||||
metrics: time_slices,
|
||||
project_events,
|
||||
}))
|
||||
}
|
||||
|
||||
pub(crate) fn none_if_empty(s: String) -> Option<String> {
|
||||
if s.is_empty() { None } else { Some(s) }
|
||||
}
|
||||
|
||||
pub(crate) fn none_if_zero_version_id(v: DBVersionId) -> Option<VersionId> {
|
||||
if v.0 == 0 { None } else { Some(v.into()) }
|
||||
}
|
||||
|
||||
pub(crate) fn condense_country(country: String, count: u64) -> String {
|
||||
// Every country under '50' (view or downloads) should be condensed into 'XX'
|
||||
if count < 50 {
|
||||
"XX".to_string()
|
||||
} else {
|
||||
country
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch_project_status_change_events(
|
||||
project_ids: &[DBProjectId],
|
||||
time_range: &TimeRange,
|
||||
pool: &PgPool,
|
||||
) -> Result<Vec<ProjectAnalyticsEvent>, ApiError> {
|
||||
let project_id_values =
|
||||
project_ids.iter().map(|id| id.0).collect::<Vec<_>>();
|
||||
|
||||
let rows = sqlx::query!(
|
||||
r#"
|
||||
SELECT
|
||||
t.mod_id AS "project_id!",
|
||||
tm.created,
|
||||
tm.body AS "body: sqlx::types::Json<MessageBody>"
|
||||
FROM threads_messages tm
|
||||
INNER JOIN threads t ON t.id = tm.thread_id
|
||||
WHERE
|
||||
t.mod_id = ANY($1)
|
||||
AND tm.body->>'type' = 'status_change'
|
||||
AND tm.created BETWEEN $2 AND $3
|
||||
"#,
|
||||
&project_id_values,
|
||||
time_range.start,
|
||||
time_range.end,
|
||||
)
|
||||
.fetch_all(&**pool)
|
||||
.await?;
|
||||
|
||||
Ok(rows
|
||||
.into_iter()
|
||||
.filter_map(|row| {
|
||||
let MessageBody::StatusChange {
|
||||
old_status,
|
||||
new_status,
|
||||
} = row.body.0
|
||||
else {
|
||||
return None;
|
||||
};
|
||||
|
||||
Some(ProjectAnalyticsEvent {
|
||||
project_id: DBProjectId(row.project_id).into(),
|
||||
timestamp: row.created,
|
||||
kind: ProjectAnalyticsEventKind::StatusChanged {
|
||||
status_from: old_status,
|
||||
status_to: new_status,
|
||||
},
|
||||
})
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub(crate) struct QueryClickhouseContext<'a> {
|
||||
pub(crate) clickhouse: &'a clickhouse::Client,
|
||||
pub(crate) req: &'a GetRequest,
|
||||
pub(crate) time_slices: &'a mut [TimeSlice],
|
||||
pub(crate) project_ids: &'a [DBProjectId],
|
||||
pub(crate) parent_version_ids: &'a [DBVersionId],
|
||||
pub(crate) affiliate_code_ids: &'a [DBAffiliateCodeId],
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default)]
|
||||
pub(crate) struct ClickhouseQueryParams {
|
||||
pub(crate) project_ids: bool,
|
||||
pub(crate) parent_version_ids: bool,
|
||||
pub(crate) affiliate_code_ids: bool,
|
||||
}
|
||||
|
||||
pub(crate) enum ClickhouseFilterParam<'a> {
|
||||
String(&'static str, &'a [String]),
|
||||
Bool(&'static str, &'a [bool]),
|
||||
VersionId(&'static str, &'a [VersionId]),
|
||||
AffiliateCodeId(&'static str, &'a [AffiliateCodeId]),
|
||||
DownloadReason(&'static str, &'a [DownloadReason]),
|
||||
}
|
||||
|
||||
impl ClickhouseFilterParam<'_> {
|
||||
pub(crate) fn bind(
|
||||
self,
|
||||
query: clickhouse::query::Query,
|
||||
) -> clickhouse::query::Query {
|
||||
match self {
|
||||
Self::String(name, values) => query.param(name, values),
|
||||
Self::Bool(name, values) => {
|
||||
let value = match values {
|
||||
[false] => 0,
|
||||
[true] => 1,
|
||||
_ => 2,
|
||||
};
|
||||
query.param(name, value)
|
||||
}
|
||||
Self::VersionId(name, values) => {
|
||||
let values = values
|
||||
.iter()
|
||||
.map(|id| DBVersionId::from(*id))
|
||||
.collect::<Vec<_>>();
|
||||
query.param(name, values)
|
||||
}
|
||||
Self::AffiliateCodeId(name, values) => {
|
||||
let values = values
|
||||
.iter()
|
||||
.map(|id| DBAffiliateCodeId::from(*id))
|
||||
.collect::<Vec<_>>();
|
||||
query.param(name, values)
|
||||
}
|
||||
Self::DownloadReason(name, values) => {
|
||||
let values =
|
||||
values.iter().map(ToString::to_string).collect::<Vec<_>>();
|
||||
query.param(name, values)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ClickhouseQueryParams {
|
||||
pub(crate) const PROJECT_IDS: Self = Self {
|
||||
project_ids: true,
|
||||
parent_version_ids: false,
|
||||
affiliate_code_ids: false,
|
||||
};
|
||||
|
||||
pub(crate) const fn empty() -> Self {
|
||||
Self {
|
||||
project_ids: false,
|
||||
parent_version_ids: false,
|
||||
affiliate_code_ids: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::BitOr for ClickhouseQueryParams {
|
||||
type Output = Self;
|
||||
|
||||
fn bitor(self, rhs: Self) -> Self::Output {
|
||||
Self {
|
||||
project_ids: self.project_ids || rhs.project_ids,
|
||||
parent_version_ids: self.parent_version_ids
|
||||
|| rhs.parent_version_ids,
|
||||
affiliate_code_ids: self.affiliate_code_ids
|
||||
|| rhs.affiliate_code_ids,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn query_clickhouse<Row>(
|
||||
cx: &mut QueryClickhouseContext<'_>,
|
||||
query: &str,
|
||||
params: ClickhouseQueryParams,
|
||||
use_columns: &[(&str, bool)],
|
||||
filter_params: Vec<ClickhouseFilterParam<'_>>,
|
||||
row_filter: impl Fn(&Row::Value<'_>) -> bool,
|
||||
// I hate using the hidden type Row::Value here, but it's what next() returns, so I see no other option
|
||||
row_get_bucket: impl Fn(&Row::Value<'_>) -> u64,
|
||||
row_to_analytics: impl Fn(Row::Value<'_>) -> AnalyticsData,
|
||||
) -> Result<(), ApiError>
|
||||
where
|
||||
Row: clickhouse::RowRead + serde::de::DeserializeOwned + std::fmt::Debug,
|
||||
{
|
||||
let mut query = cx
|
||||
.clickhouse
|
||||
.query(query)
|
||||
.param("time_range_start", cx.req.time_range.start.timestamp())
|
||||
.param("time_range_end", cx.req.time_range.end.timestamp())
|
||||
.param("time_slices", cx.time_slices.len());
|
||||
if params.project_ids {
|
||||
query = query.param("project_ids", cx.project_ids);
|
||||
}
|
||||
if params.parent_version_ids {
|
||||
query = query.param("parent_version_ids", cx.parent_version_ids);
|
||||
}
|
||||
if params.affiliate_code_ids {
|
||||
query = query.param("affiliate_code_ids", cx.affiliate_code_ids);
|
||||
}
|
||||
for (param_name, used) in use_columns {
|
||||
query = query.param(param_name, used)
|
||||
}
|
||||
for filter_param in filter_params {
|
||||
query = filter_param.bind(query);
|
||||
}
|
||||
let mut cursor = query.fetch::<Row>()?;
|
||||
|
||||
while let Some(row) = cursor.next().await? {
|
||||
if !row_filter(&row) {
|
||||
continue;
|
||||
}
|
||||
let bucket = row_get_bucket(&row) as usize;
|
||||
add_to_time_slice(cx.time_slices, bucket, row_to_analytics(row))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn add_to_time_slice(
|
||||
time_slices: &mut [TimeSlice],
|
||||
bucket: usize,
|
||||
data: AnalyticsData,
|
||||
) -> Result<(), ApiError> {
|
||||
// row.recorded < time_range_start => bucket = 0
|
||||
// row.recorded >= time_range_end => bucket = num_time_slices
|
||||
// (note: this is out of range of `time_slices`!)
|
||||
let Some(bucket) = bucket.checked_sub(1) else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let num_time_slices = time_slices.len();
|
||||
let slice = time_slices.get_mut(bucket).ok_or_else(|| {
|
||||
ApiError::InvalidInput(
|
||||
format!("bucket {bucket} returned by query out of range for {num_time_slices} - query bug!")
|
||||
)
|
||||
})?;
|
||||
|
||||
slice.0.push(data);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn filter_allowed_project_ids(
|
||||
project_ids: &[DBProjectId],
|
||||
user: &crate::models::users::User,
|
||||
pool: &PgPool,
|
||||
redis: &RedisPool,
|
||||
) -> Result<Vec<DBProjectId>, ApiError> {
|
||||
let projects = DBProject::get_many_ids(project_ids, pool, redis).await?;
|
||||
|
||||
let team_ids = projects
|
||||
.iter()
|
||||
.map(|x| x.inner.team_id)
|
||||
.collect::<Vec<database::models::DBTeamId>>();
|
||||
let team_members = database::models::DBTeamMember::get_from_team_full_many(
|
||||
&team_ids, pool, redis,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let organization_ids = projects
|
||||
.iter()
|
||||
.filter_map(|x| x.inner.organization_id)
|
||||
.collect::<Vec<database::models::DBOrganizationId>>();
|
||||
let organizations = database::models::DBOrganization::get_many_ids(
|
||||
&organization_ids,
|
||||
pool,
|
||||
redis,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let organization_team_ids = organizations
|
||||
.iter()
|
||||
.map(|x| x.team_id)
|
||||
.collect::<Vec<database::models::DBTeamId>>();
|
||||
let organization_team_members =
|
||||
database::models::DBTeamMember::get_from_team_full_many(
|
||||
&organization_team_ids,
|
||||
pool,
|
||||
redis,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(projects
|
||||
.into_iter()
|
||||
.filter(|project| {
|
||||
let team_member = team_members.iter().find(|x| {
|
||||
x.team_id == project.inner.team_id
|
||||
&& x.user_id == user.id.into()
|
||||
});
|
||||
|
||||
let organization = project
|
||||
.inner
|
||||
.organization_id
|
||||
.and_then(|oid| organizations.iter().find(|x| x.id == oid));
|
||||
|
||||
let organization_team_member =
|
||||
if let Some(organization) = organization {
|
||||
organization_team_members.iter().find(|x| {
|
||||
x.team_id == organization.team_id
|
||||
&& x.user_id == user.id.into()
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let permissions = ProjectPermissions::get_permissions_by_role(
|
||||
&user.role,
|
||||
&team_member.cloned(),
|
||||
&organization_team_member.cloned(),
|
||||
)
|
||||
.unwrap_or_default();
|
||||
|
||||
permissions.contains(ProjectPermissions::VIEW_ANALYTICS)
|
||||
})
|
||||
.map(|project| project.inner.id)
|
||||
.collect::<Vec<_>>())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use rust_decimal::Decimal;
|
||||
use serde_json::json;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn normalizes_download_sources() {
|
||||
let cases = [
|
||||
("MultiMC/5.0", Some(DownloadSource::Named("MultiMC".into()))),
|
||||
(
|
||||
"PrismLauncher/6.1",
|
||||
Some(DownloadSource::Named("Prism Launcher".into())),
|
||||
),
|
||||
(
|
||||
"modrinth/theseus/0.8.6 (support@modrinth.com)",
|
||||
Some(DownloadSource::ModrinthApp),
|
||||
),
|
||||
(
|
||||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15",
|
||||
Some(DownloadSource::Website),
|
||||
),
|
||||
("curl/8.7.1", None),
|
||||
];
|
||||
|
||||
for (user_agent, source) in cases {
|
||||
assert_eq!(normalize_download_source(user_agent), source);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn download_source_serializes_as_raw_string() {
|
||||
assert_eq!(
|
||||
serde_json::to_value(DownloadSource::Named("MultiMC".into()))
|
||||
.unwrap(),
|
||||
json!("MultiMC")
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::to_value(DownloadSource::Website).unwrap(),
|
||||
json!("website")
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::to_value(DownloadSource::ModrinthApp).unwrap(),
|
||||
json!("modrinth_app")
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::to_value(DownloadSource::Other).unwrap(),
|
||||
json!("other")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn response_format() {
|
||||
let test_project_1 = ProjectId(123);
|
||||
let test_project_2 = ProjectId(456);
|
||||
let test_project_3 = ProjectId(789);
|
||||
|
||||
let src = GetResponse {
|
||||
metrics: vec![
|
||||
TimeSlice(vec![
|
||||
AnalyticsData::Project(ProjectAnalytics {
|
||||
source_project: test_project_1,
|
||||
metrics: ProjectMetrics::Views(ProjectViews {
|
||||
domain: Some("youtube.com".into()),
|
||||
views: 100,
|
||||
..Default::default()
|
||||
}),
|
||||
}),
|
||||
AnalyticsData::Project(ProjectAnalytics {
|
||||
source_project: test_project_2,
|
||||
metrics: ProjectMetrics::Downloads(ProjectDownloads {
|
||||
domain: Some("discord.com".into()),
|
||||
downloads: 150,
|
||||
..Default::default()
|
||||
}),
|
||||
}),
|
||||
]),
|
||||
TimeSlice(vec![AnalyticsData::Project(ProjectAnalytics {
|
||||
source_project: test_project_3,
|
||||
metrics: ProjectMetrics::Revenue(ProjectRevenue {
|
||||
revenue: Decimal::new(20000, 2),
|
||||
}),
|
||||
})]),
|
||||
],
|
||||
project_events: vec![],
|
||||
};
|
||||
let target = json!({
|
||||
"metrics": [
|
||||
[
|
||||
{
|
||||
"source_project": test_project_1.to_string(),
|
||||
"metric_kind": "views",
|
||||
"domain": "youtube.com",
|
||||
"views": 100,
|
||||
},
|
||||
{
|
||||
"source_project": test_project_2.to_string(),
|
||||
"metric_kind": "downloads",
|
||||
"domain": "discord.com",
|
||||
"downloads": 150,
|
||||
}
|
||||
],
|
||||
[
|
||||
{
|
||||
"source_project": test_project_3.to_string(),
|
||||
"metric_kind": "revenue",
|
||||
"revenue": "200.00",
|
||||
}
|
||||
]
|
||||
],
|
||||
"project_events": []
|
||||
});
|
||||
|
||||
assert_eq!(serde_json::to_value(src).unwrap(), target);
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,7 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use super::{ApiError, oauth_clients::get_user_clients};
|
||||
use crate::database::PgPool;
|
||||
@@ -10,12 +13,14 @@ use crate::{
|
||||
get_user_from_headers,
|
||||
},
|
||||
database::{
|
||||
models::{DBModerationNote, DBUser},
|
||||
models::{DBModerationNote, DBOrganization, DBProjectId, DBUser},
|
||||
redis::RedisPool,
|
||||
},
|
||||
file_hosting::{FileHost, FileHostPublicity},
|
||||
models::{
|
||||
ids::OrganizationId,
|
||||
notifications::Notification,
|
||||
organizations::Organization,
|
||||
pats::Scopes,
|
||||
projects::Project,
|
||||
users::{Badges, Role},
|
||||
@@ -35,6 +40,7 @@ pub fn config(cfg: &mut web::ServiceConfig) {
|
||||
cfg.route("user", web::get().to(user_auth_get));
|
||||
cfg.route("users", web::get().to(users_get));
|
||||
cfg.route("user_email", web::get().to(admin_user_email));
|
||||
cfg.route("all-projects", web::get().to(all_projects));
|
||||
|
||||
cfg.service(
|
||||
web::scope("user")
|
||||
@@ -53,11 +59,135 @@ pub fn config(cfg: &mut web::ServiceConfig) {
|
||||
);
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct AllProjectsResponse {
|
||||
pub projects: Vec<Project>,
|
||||
pub organizations: HashMap<OrganizationId, Organization>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct UserEmailQuery {
|
||||
pub email: String,
|
||||
}
|
||||
|
||||
pub async fn all_projects(
|
||||
req: HttpRequest,
|
||||
pool: web::Data<PgPool>,
|
||||
redis: web::Data<RedisPool>,
|
||||
session_queue: web::Data<AuthQueue>,
|
||||
) -> Result<web::Json<AllProjectsResponse>, ApiError> {
|
||||
let user = get_user_from_headers(
|
||||
&req,
|
||||
&**pool,
|
||||
&redis,
|
||||
&session_queue,
|
||||
Scopes::PROJECT_READ | Scopes::ORGANIZATION_READ,
|
||||
)
|
||||
.await?
|
||||
.1;
|
||||
|
||||
let user_project_ids =
|
||||
DBUser::get_projects(user.id.into(), &**pool, &redis).await?;
|
||||
let organization_ids =
|
||||
DBUser::get_organizations(user.id.into(), &**pool).await?;
|
||||
let organizations_data =
|
||||
DBOrganization::get_many_ids(&organization_ids, &**pool, &redis)
|
||||
.await?;
|
||||
|
||||
let team_ids = organizations_data
|
||||
.iter()
|
||||
.map(|organization| organization.team_id)
|
||||
.collect::<Vec<_>>();
|
||||
let teams_data =
|
||||
crate::database::models::DBTeamMember::get_from_team_full_many(
|
||||
&team_ids, &**pool, &redis,
|
||||
)
|
||||
.await?;
|
||||
let users = DBUser::get_many_ids(
|
||||
&teams_data
|
||||
.iter()
|
||||
.map(|member| member.user_id)
|
||||
.collect::<Vec<_>>(),
|
||||
&**pool,
|
||||
&redis,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut team_groups = HashMap::new();
|
||||
for member in teams_data {
|
||||
team_groups
|
||||
.entry(member.team_id)
|
||||
.or_insert(vec![])
|
||||
.push(member);
|
||||
}
|
||||
|
||||
let mut organizations = HashMap::new();
|
||||
let mut visible_organization_ids = Vec::new();
|
||||
for data in organizations_data {
|
||||
if !is_visible_organization(&data, &Some(user.clone()), &pool, &redis)
|
||||
.await?
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
visible_organization_ids.push(data.id);
|
||||
let members_data = team_groups.remove(&data.team_id).unwrap_or(vec![]);
|
||||
let team_members = members_data
|
||||
.into_iter()
|
||||
.filter_map(|data| {
|
||||
users.iter().find(|x| x.id == data.user_id).map(|member| {
|
||||
crate::models::teams::TeamMember::from(
|
||||
data,
|
||||
member.clone(),
|
||||
false,
|
||||
)
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
organizations.insert(
|
||||
OrganizationId::from(data.id),
|
||||
Organization::from(data, team_members),
|
||||
);
|
||||
}
|
||||
|
||||
let organization_id_values = visible_organization_ids
|
||||
.iter()
|
||||
.map(|id| id.0)
|
||||
.collect::<Vec<_>>();
|
||||
let organization_project_ids = sqlx::query!(
|
||||
"
|
||||
SELECT m.id
|
||||
FROM mods m
|
||||
WHERE m.organization_id = ANY($1)
|
||||
",
|
||||
&organization_id_values,
|
||||
)
|
||||
.fetch_all(&**pool)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|row| DBProjectId(row.id))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let project_ids = user_project_ids
|
||||
.into_iter()
|
||||
.chain(organization_project_ids)
|
||||
.collect::<HashSet<_>>()
|
||||
.into_iter()
|
||||
.collect::<Vec<_>>();
|
||||
let projects_data =
|
||||
crate::database::DBProject::get_many_ids(&project_ids, &**pool, &redis)
|
||||
.await?;
|
||||
let projects =
|
||||
filter_visible_projects(projects_data, &Some(user), &pool, true)
|
||||
.await?;
|
||||
|
||||
Ok(web::Json(AllProjectsResponse {
|
||||
projects,
|
||||
organizations,
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn admin_user_email(
|
||||
req: HttpRequest,
|
||||
pool: web::Data<PgPool>,
|
||||
|
||||
Reference in New Issue
Block a user