You've already forked AstralRinth
Filter out invalid loaders in analytics, fix query bug (#6241)
* should fix bucketing query bug * Filter out invalid loaders, fix query bug * fixes
This commit is contained in:
+2
-2
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT mod_id, SUM(amount) amount_sum, DATE_BIN($4::interval, created, TIMESTAMP '2001-01-01') AS interval_start\n FROM payouts_values\n WHERE user_id = $1 AND created BETWEEN $2 AND $3\n GROUP by mod_id, interval_start ORDER BY interval_start\n ",
|
||||
"query": "\n SELECT mod_id, SUM(amount) amount_sum, DATE_BIN($4::interval, created, TIMESTAMP '2001-01-01') AS interval_start\n FROM payouts_values\n WHERE user_id = $1 AND created >= $2 AND created < $3\n GROUP by mod_id, interval_start ORDER BY interval_start\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
@@ -33,5 +33,5 @@
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "dfb4bd3db0d1cc2b2f811c267547a224ee4710e202cf1c8f3f35e49b54d6f2f9"
|
||||
"hash": "33181ac6fe503d4ca6b396d6ddd7070d2abac466048c870b2f23497848cf55b3"
|
||||
}
|
||||
+2
-2
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT mod_id, SUM(amount) amount_sum, DATE_BIN($4::interval, created, TIMESTAMP '2001-01-01') AS interval_start\n FROM payouts_values\n WHERE mod_id = ANY($1) AND created BETWEEN $2 AND $3\n GROUP by mod_id, interval_start ORDER BY interval_start\n ",
|
||||
"query": "\n SELECT mod_id, SUM(amount) amount_sum, DATE_BIN($4::interval, created, TIMESTAMP '2001-01-01') AS interval_start\n FROM payouts_values\n WHERE mod_id = ANY($1) AND created >= $2 AND created < $3\n GROUP by mod_id, interval_start ORDER BY interval_start\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
@@ -33,5 +33,5 @@
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "4198ea701f956dd65cab1a8e60b5b67df45f8c07bb70e3c4f090d943feafdaf3"
|
||||
"hash": "41da353ec0bfb3209f095e1e4f932beedc4b3b1c6de8365b7b523b9d917977b2"
|
||||
}
|
||||
+2
-2
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT\n t.mod_id AS \"project_id!\",\n tm.created,\n tm.body AS \"body: sqlx::types::Json<MessageBody>\"\n FROM threads_messages tm\n INNER JOIN threads t ON t.id = tm.thread_id\n WHERE\n t.mod_id = ANY($1)\n AND tm.body->>'type' = 'status_change'\n AND tm.created BETWEEN $2 AND $3\n ",
|
||||
"query": "\n SELECT\n t.mod_id AS \"project_id!\",\n tm.created,\n tm.body AS \"body: sqlx::types::Json<MessageBody>\"\n FROM threads_messages tm\n INNER JOIN threads t ON t.id = tm.thread_id\n WHERE\n t.mod_id = ANY($1)\n AND tm.body->>'type' = 'status_change'\n AND tm.created >= $2\n AND tm.created < $3\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
@@ -32,5 +32,5 @@
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "a5141c0435441f062231c842cb5db5e0c78f8f3896c1e9f2b2b56cce41fa1591"
|
||||
"hash": "550dfe07e122a331b92e931ec2b4e4a3a77e8bacdd5a7e2e13b06aa6b55fceb3"
|
||||
}
|
||||
@@ -1,14 +1,17 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use actix_web::{HttpRequest, post, web};
|
||||
use serde::Serialize;
|
||||
|
||||
use super::{DownloadSource, GetRequest, TimeRange, normalize_download_source};
|
||||
use super::{
|
||||
DownloadSource, GetRequest, TimeRange, normalize_download_source,
|
||||
normalize_loader_for_project,
|
||||
};
|
||||
use crate::{
|
||||
auth::get_user_from_headers,
|
||||
database::{
|
||||
PgPool,
|
||||
models::{DBProjectId, DBUser, DBVersionId},
|
||||
models::{DBProjectId, DBUser, DBVersion, DBVersionId},
|
||||
redis::RedisPool,
|
||||
},
|
||||
models::{ids::VersionId, pats::Scopes, v3::analytics::DownloadReason},
|
||||
@@ -72,6 +75,21 @@ struct StringFacetRow {
|
||||
count: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, clickhouse::Row, serde::Deserialize)]
|
||||
struct ProjectStringFacetRow {
|
||||
project_id: DBProjectId,
|
||||
value: String,
|
||||
count: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, clickhouse::Row, serde::Deserialize)]
|
||||
struct PlaytimeLoaderFacetRow {
|
||||
project_id: DBProjectId,
|
||||
parent_version_id: DBVersionId,
|
||||
value: String,
|
||||
count: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, clickhouse::Row, serde::Deserialize)]
|
||||
struct VersionFacetRow {
|
||||
value: DBVersionId,
|
||||
@@ -120,30 +138,40 @@ pub async fn fetch_facets(
|
||||
|
||||
let parent_version_ids =
|
||||
fetch_project_version_ids(&project_ids, &pool).await?;
|
||||
let parent_version_data =
|
||||
DBVersion::get_many(&parent_version_ids, &**pool, &redis).await?;
|
||||
let project_loaders = super::project_loader_map(&parent_version_data);
|
||||
let parent_version_projects = parent_version_data
|
||||
.iter()
|
||||
.map(|version| (version.inner.id, version.inner.project_id))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
Ok(web::Json(FacetsResponse {
|
||||
facets: AnalyticsFacets {
|
||||
project_views: fetch_project_views_facets(
|
||||
&clickhouse,
|
||||
&project_ids,
|
||||
&req.time_range,
|
||||
)
|
||||
.await?,
|
||||
project_downloads: fetch_project_downloads_facets(
|
||||
&clickhouse,
|
||||
&project_ids,
|
||||
&req.time_range,
|
||||
)
|
||||
.await?,
|
||||
project_playtime: fetch_project_playtime_facets(
|
||||
&clickhouse,
|
||||
&project_ids,
|
||||
&parent_version_ids,
|
||||
&req.time_range,
|
||||
)
|
||||
.await?,
|
||||
},
|
||||
}))
|
||||
let facets = AnalyticsFacets {
|
||||
project_views: fetch_project_views_facets(
|
||||
&clickhouse,
|
||||
&project_ids,
|
||||
&req.time_range,
|
||||
)
|
||||
.await?,
|
||||
project_downloads: fetch_project_downloads_facets(
|
||||
&clickhouse,
|
||||
&project_ids,
|
||||
&req.time_range,
|
||||
&project_loaders,
|
||||
)
|
||||
.await?,
|
||||
project_playtime: fetch_project_playtime_facets(
|
||||
&clickhouse,
|
||||
&project_ids,
|
||||
&parent_version_ids,
|
||||
&req.time_range,
|
||||
&project_loaders,
|
||||
&parent_version_projects,
|
||||
)
|
||||
.await?,
|
||||
};
|
||||
|
||||
Ok(web::Json(FacetsResponse { facets }))
|
||||
}
|
||||
|
||||
async fn fetch_project_version_ids(
|
||||
@@ -175,28 +203,28 @@ async fn fetch_project_views_facets(
|
||||
Ok(ProjectViewsFacets {
|
||||
domain: fetch_string_facet(
|
||||
clickhouse,
|
||||
"SELECT domain AS value, COUNT(*) AS count FROM views WHERE recorded BETWEEN {time_range_start: Int64} AND {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} AND domain != '' GROUP BY value ORDER BY value",
|
||||
"SELECT domain AS value, COUNT(*) AS count FROM views WHERE recorded >= {time_range_start: Int64} AND recorded < {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} AND domain != '' GROUP BY value ORDER BY value",
|
||||
project_ids,
|
||||
time_range,
|
||||
)
|
||||
.await?,
|
||||
site_path: fetch_string_facet(
|
||||
clickhouse,
|
||||
"SELECT site_path AS value, COUNT(*) AS count FROM views WHERE recorded BETWEEN {time_range_start: Int64} AND {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} AND site_path != '' GROUP BY value ORDER BY value",
|
||||
"SELECT site_path AS value, COUNT(*) AS count FROM views WHERE recorded >= {time_range_start: Int64} AND recorded < {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} AND site_path != '' GROUP BY value ORDER BY value",
|
||||
project_ids,
|
||||
time_range,
|
||||
)
|
||||
.await?,
|
||||
monetized: fetch_bool_facet(
|
||||
clickhouse,
|
||||
"SELECT monetized AS value, COUNT(*) AS count FROM views WHERE recorded BETWEEN {time_range_start: Int64} AND {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} GROUP BY value ORDER BY value",
|
||||
"SELECT monetized AS value, COUNT(*) AS count FROM views WHERE recorded >= {time_range_start: Int64} AND recorded < {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} GROUP BY value ORDER BY value",
|
||||
project_ids,
|
||||
time_range,
|
||||
)
|
||||
.await?,
|
||||
country: fetch_string_facet(
|
||||
clickhouse,
|
||||
"SELECT country AS value, COUNT(*) AS count FROM views WHERE recorded BETWEEN {time_range_start: Int64} AND {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} AND country != '' GROUP BY value ORDER BY value",
|
||||
"SELECT country AS value, COUNT(*) AS count FROM views WHERE recorded >= {time_range_start: Int64} AND recorded < {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} AND country != '' GROUP BY value ORDER BY value",
|
||||
project_ids,
|
||||
time_range,
|
||||
)
|
||||
@@ -208,10 +236,11 @@ async fn fetch_project_downloads_facets(
|
||||
clickhouse: &clickhouse::Client,
|
||||
project_ids: &[DBProjectId],
|
||||
time_range: &TimeRange,
|
||||
project_loaders: &HashMap<DBProjectId, HashSet<String>>,
|
||||
) -> Result<ProjectDownloadsFacets, ApiError> {
|
||||
let user_agents = fetch_string_facet(
|
||||
clickhouse,
|
||||
"SELECT user_agent AS value, COUNT(*) AS count FROM downloads WHERE recorded BETWEEN {time_range_start: Int64} AND {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} AND user_agent != '' GROUP BY value",
|
||||
"SELECT user_agent AS value, COUNT(*) AS count FROM downloads WHERE recorded >= {time_range_start: Int64} AND recorded < {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} AND user_agent != '' GROUP BY value",
|
||||
project_ids,
|
||||
time_range,
|
||||
)
|
||||
@@ -221,7 +250,7 @@ async fn fetch_project_downloads_facets(
|
||||
Ok(ProjectDownloadsFacets {
|
||||
domain: fetch_string_facet(
|
||||
clickhouse,
|
||||
"SELECT domain AS value, COUNT(*) AS count FROM downloads WHERE recorded BETWEEN {time_range_start: Int64} AND {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} AND domain != '' GROUP BY value ORDER BY value",
|
||||
"SELECT domain AS value, COUNT(*) AS count FROM downloads WHERE recorded >= {time_range_start: Int64} AND recorded < {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} AND domain != '' GROUP BY value ORDER BY value",
|
||||
project_ids,
|
||||
time_range,
|
||||
)
|
||||
@@ -229,28 +258,28 @@ async fn fetch_project_downloads_facets(
|
||||
user_agent,
|
||||
version_id: fetch_version_facet(
|
||||
clickhouse,
|
||||
"SELECT version_id AS value, COUNT(*) AS count FROM downloads WHERE recorded BETWEEN {time_range_start: Int64} AND {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} AND version_id != 0 GROUP BY value ORDER BY value",
|
||||
"SELECT version_id AS value, COUNT(*) AS count FROM downloads WHERE recorded >= {time_range_start: Int64} AND recorded < {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} AND version_id != 0 GROUP BY value ORDER BY value",
|
||||
project_ids,
|
||||
time_range,
|
||||
)
|
||||
.await?,
|
||||
monetized: fetch_bool_facet(
|
||||
clickhouse,
|
||||
"SELECT user_id != 0 AS value, COUNT(*) AS count FROM downloads WHERE recorded BETWEEN {time_range_start: Int64} AND {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} GROUP BY value ORDER BY value",
|
||||
"SELECT user_id != 0 AS value, COUNT(*) AS count FROM downloads WHERE recorded >= {time_range_start: Int64} AND recorded < {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} GROUP BY value ORDER BY value",
|
||||
project_ids,
|
||||
time_range,
|
||||
)
|
||||
.await?,
|
||||
country: fetch_string_facet(
|
||||
clickhouse,
|
||||
"SELECT country AS value, COUNT(*) AS count FROM downloads WHERE recorded BETWEEN {time_range_start: Int64} AND {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} AND country != '' GROUP BY value ORDER BY value",
|
||||
"SELECT country AS value, COUNT(*) AS count FROM downloads WHERE recorded >= {time_range_start: Int64} AND recorded < {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} AND country != '' GROUP BY value ORDER BY value",
|
||||
project_ids,
|
||||
time_range,
|
||||
)
|
||||
.await?,
|
||||
reason: fetch_string_facet(
|
||||
clickhouse,
|
||||
"SELECT reason AS value, COUNT(*) AS count FROM downloads WHERE recorded BETWEEN {time_range_start: Int64} AND {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} AND reason != '' GROUP BY value ORDER BY value",
|
||||
"SELECT reason AS value, COUNT(*) AS count FROM downloads WHERE recorded >= {time_range_start: Int64} AND recorded < {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} AND reason != '' GROUP BY value ORDER BY value",
|
||||
project_ids,
|
||||
time_range,
|
||||
)
|
||||
@@ -265,16 +294,17 @@ async fn fetch_project_downloads_facets(
|
||||
.collect(),
|
||||
game_version: fetch_string_facet(
|
||||
clickhouse,
|
||||
"SELECT game_version AS value, COUNT(*) AS count FROM downloads WHERE recorded BETWEEN {time_range_start: Int64} AND {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} AND game_version != '' GROUP BY value ORDER BY value",
|
||||
"SELECT game_version AS value, COUNT(*) AS count FROM downloads WHERE recorded >= {time_range_start: Int64} AND recorded < {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} AND game_version != '' GROUP BY value ORDER BY value",
|
||||
project_ids,
|
||||
time_range,
|
||||
)
|
||||
.await?,
|
||||
loader: fetch_string_facet(
|
||||
loader: fetch_project_loader_facet(
|
||||
clickhouse,
|
||||
"SELECT loader AS value, COUNT(*) AS count FROM downloads WHERE recorded BETWEEN {time_range_start: Int64} AND {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} AND loader != '' GROUP BY value ORDER BY value",
|
||||
"SELECT project_id, loader AS value, COUNT(*) AS count FROM downloads WHERE recorded >= {time_range_start: Int64} AND recorded < {time_range_end: Int64} AND project_id IN {project_ids: Array(UInt64)} AND loader != '' GROUP BY project_id, value ORDER BY value",
|
||||
project_ids,
|
||||
time_range,
|
||||
project_loaders,
|
||||
)
|
||||
.await?,
|
||||
})
|
||||
@@ -317,6 +347,8 @@ async fn fetch_project_playtime_facets(
|
||||
project_ids: &[DBProjectId],
|
||||
parent_version_ids: &[DBVersionId],
|
||||
time_range: &TimeRange,
|
||||
project_loaders: &HashMap<DBProjectId, HashSet<String>>,
|
||||
parent_version_projects: &HashMap<DBVersionId, DBProjectId>,
|
||||
) -> Result<ProjectPlaytimeFacets, ApiError> {
|
||||
Ok(ProjectPlaytimeFacets {
|
||||
version_id: fetch_playtime_version_facet(
|
||||
@@ -326,12 +358,13 @@ async fn fetch_project_playtime_facets(
|
||||
time_range,
|
||||
)
|
||||
.await?,
|
||||
loader: fetch_playtime_string_facet(
|
||||
loader: fetch_playtime_loader_facet(
|
||||
clickhouse,
|
||||
"loader",
|
||||
project_ids,
|
||||
parent_version_ids,
|
||||
time_range,
|
||||
project_loaders,
|
||||
parent_version_projects,
|
||||
)
|
||||
.await?,
|
||||
game_version: fetch_playtime_string_facet(
|
||||
@@ -375,6 +408,32 @@ async fn fetch_string_facet(
|
||||
Ok(values)
|
||||
}
|
||||
|
||||
async fn fetch_project_loader_facet(
|
||||
clickhouse: &clickhouse::Client,
|
||||
query: &str,
|
||||
project_ids: &[DBProjectId],
|
||||
time_range: &TimeRange,
|
||||
project_loaders: &HashMap<DBProjectId, HashSet<String>>,
|
||||
) -> Result<Vec<FacetValue<String>>, ApiError> {
|
||||
let mut rows = clickhouse
|
||||
.query(query)
|
||||
.param("time_range_start", time_range.start.timestamp())
|
||||
.param("time_range_end", time_range.end.timestamp())
|
||||
.param("project_ids", project_ids)
|
||||
.fetch::<ProjectStringFacetRow>()?;
|
||||
let mut counts = HashMap::<String, u64>::new();
|
||||
while let Some(row) = rows.next().await? {
|
||||
let loader = normalize_loader_for_project(
|
||||
row.value,
|
||||
row.project_id,
|
||||
project_loaders,
|
||||
);
|
||||
*counts.entry(loader).or_default() += row.count;
|
||||
}
|
||||
|
||||
Ok(sorted_string_facets(counts))
|
||||
}
|
||||
|
||||
async fn fetch_version_facet(
|
||||
clickhouse: &clickhouse::Client,
|
||||
query: &str,
|
||||
@@ -429,7 +488,8 @@ async fn fetch_playtime_string_facet(
|
||||
let query = format!(
|
||||
"SELECT {column} AS value, COUNT(*) AS count
|
||||
FROM playtime
|
||||
WHERE recorded BETWEEN {{time_range_start: Int64}} AND {{time_range_end: Int64}}
|
||||
WHERE recorded >= {{time_range_start: Int64}}
|
||||
AND recorded < {{time_range_end: Int64}}
|
||||
AND (project_id IN {{project_ids: Array(UInt64)}} OR parent IN {{parent_version_ids: Array(UInt64)}})
|
||||
AND {column} != ''
|
||||
GROUP BY value
|
||||
@@ -452,6 +512,62 @@ async fn fetch_playtime_string_facet(
|
||||
Ok(values)
|
||||
}
|
||||
|
||||
async fn fetch_playtime_loader_facet(
|
||||
clickhouse: &clickhouse::Client,
|
||||
project_ids: &[DBProjectId],
|
||||
parent_version_ids: &[DBVersionId],
|
||||
time_range: &TimeRange,
|
||||
project_loaders: &HashMap<DBProjectId, HashSet<String>>,
|
||||
parent_version_projects: &HashMap<DBVersionId, DBProjectId>,
|
||||
) -> Result<Vec<FacetValue<String>>, ApiError> {
|
||||
let mut rows = clickhouse
|
||||
.query(
|
||||
"SELECT project_id, parent AS parent_version_id, loader AS value, COUNT(*) AS count
|
||||
FROM playtime
|
||||
WHERE recorded >= {time_range_start: Int64}
|
||||
AND recorded < {time_range_end: Int64}
|
||||
AND (project_id IN {project_ids: Array(UInt64)} OR parent IN {parent_version_ids: Array(UInt64)})
|
||||
AND loader != ''
|
||||
GROUP BY project_id, parent_version_id, value
|
||||
ORDER BY value",
|
||||
)
|
||||
.param("time_range_start", time_range.start.timestamp())
|
||||
.param("time_range_end", time_range.end.timestamp())
|
||||
.param("project_ids", project_ids)
|
||||
.param("parent_version_ids", parent_version_ids)
|
||||
.fetch::<PlaytimeLoaderFacetRow>()?;
|
||||
let mut counts = HashMap::<String, u64>::new();
|
||||
while let Some(row) = rows.next().await? {
|
||||
let project_id = if row.project_id.0 == 0 {
|
||||
parent_version_projects
|
||||
.get(&row.parent_version_id)
|
||||
.copied()
|
||||
.unwrap_or(row.project_id)
|
||||
} else {
|
||||
row.project_id
|
||||
};
|
||||
let loader = normalize_loader_for_project(
|
||||
row.value,
|
||||
project_id,
|
||||
project_loaders,
|
||||
);
|
||||
*counts.entry(loader).or_default() += row.count;
|
||||
}
|
||||
|
||||
Ok(sorted_string_facets(counts))
|
||||
}
|
||||
|
||||
fn sorted_string_facets(
|
||||
counts: HashMap<String, u64>,
|
||||
) -> Vec<FacetValue<String>> {
|
||||
let mut facets = counts
|
||||
.into_iter()
|
||||
.map(|(value, count)| FacetValue { value, count })
|
||||
.collect::<Vec<_>>();
|
||||
facets.sort_by(|a, b| a.value.cmp(&b.value));
|
||||
facets
|
||||
}
|
||||
|
||||
async fn fetch_playtime_version_facet(
|
||||
clickhouse: &clickhouse::Client,
|
||||
project_ids: &[DBProjectId],
|
||||
@@ -462,7 +578,8 @@ async fn fetch_playtime_version_facet(
|
||||
.query(
|
||||
"SELECT version_id AS value, COUNT(*) AS count
|
||||
FROM playtime
|
||||
WHERE recorded BETWEEN {time_range_start: Int64} AND {time_range_end: Int64}
|
||||
WHERE recorded >= {time_range_start: Int64}
|
||||
AND recorded < {time_range_end: Int64}
|
||||
AND (project_id IN {project_ids: Array(UInt64)} OR parent IN {parent_version_ids: Array(UInt64)})
|
||||
AND version_id != 0
|
||||
GROUP BY value
|
||||
|
||||
@@ -62,7 +62,8 @@ const AFFILIATE_CODE_CLICKS: &str = {
|
||||
COUNT(*) AS clicks
|
||||
FROM affiliate_code_clicks
|
||||
WHERE
|
||||
recorded BETWEEN {TIME_RANGE_START} AND {TIME_RANGE_END}
|
||||
recorded >= {TIME_RANGE_START}
|
||||
AND recorded < {TIME_RANGE_END}
|
||||
-- make sure that the REAL affiliate code id is included,
|
||||
-- not the possibly-zero one,
|
||||
-- by using `affiliate_code_clicks.affiliate_code_id` instead of `project_id`
|
||||
|
||||
@@ -75,7 +75,8 @@ pub(crate) async fn fetch(
|
||||
INNER JOIN charges c ON c.subscription_id = us.id
|
||||
WHERE
|
||||
ac.affiliate = $4
|
||||
AND usa.created_at BETWEEN $1 AND $2
|
||||
AND usa.created_at >= $1
|
||||
AND usa.created_at < $2
|
||||
AND c.status = 'succeeded'
|
||||
AND (cardinality($6::bigint[]) = 0 OR affiliate_code = ANY($6))
|
||||
GROUP BY bucket, affiliate_code",
|
||||
|
||||
@@ -71,7 +71,8 @@ pub(crate) async fn fetch(
|
||||
WHERE
|
||||
user_id = $4
|
||||
AND payouts_values.affiliate_code_source IS NOT NULL
|
||||
AND created BETWEEN $1 AND $2
|
||||
AND created >= $1
|
||||
AND created < $2
|
||||
AND (cardinality($6::bigint[]) = 0 OR affiliate_code_source = ANY($6))
|
||||
GROUP BY bucket, affiliate_code_source",
|
||||
)
|
||||
|
||||
@@ -20,6 +20,7 @@ use crate::{
|
||||
use super::super::{
|
||||
ClickhouseFilterParam, QueryClickhouseContext, add_to_time_slice,
|
||||
condense_country, none_if_empty, none_if_zero_version_id,
|
||||
normalize_loader_for_project,
|
||||
};
|
||||
use super::{AnalyticsData, Metrics, ProjectAnalytics, ProjectMetrics};
|
||||
|
||||
@@ -169,6 +170,7 @@ impl<'de> Deserialize<'de> for DownloadSource {
|
||||
#[derive(Debug, clickhouse::Row, serde::Deserialize)]
|
||||
struct DownloadRow {
|
||||
bucket: u64,
|
||||
source_project_id: DBProjectId,
|
||||
project_id: DBProjectId,
|
||||
domain: String,
|
||||
user_agent: String,
|
||||
@@ -202,7 +204,8 @@ const DOWNLOADS: &str = {
|
||||
formatcp!(
|
||||
"SELECT
|
||||
widthBucket(toUnixTimestamp(recorded), {TIME_RANGE_START}, {TIME_RANGE_END}, {TIME_SLICES}) AS bucket,
|
||||
if({USE_PROJECT_ID}, project_id, 0) AS project_id,
|
||||
downloads.project_id AS source_project_id,
|
||||
if({USE_PROJECT_ID}, downloads.project_id, 0) AS project_id,
|
||||
if({USE_DOMAIN}, domain, '') AS domain,
|
||||
if({USE_USER_AGENT}, user_agent, '') AS user_agent,
|
||||
if({USE_VERSION_ID}, version_id, 0) AS version_id,
|
||||
@@ -214,7 +217,8 @@ const DOWNLOADS: &str = {
|
||||
COUNT(*) AS downloads
|
||||
FROM downloads
|
||||
WHERE
|
||||
recorded BETWEEN {TIME_RANGE_START} AND {TIME_RANGE_END}
|
||||
recorded >= {TIME_RANGE_START}
|
||||
AND recorded < {TIME_RANGE_END}
|
||||
-- make sure that the REAL project id is included,
|
||||
-- not the possibly-zero one,
|
||||
-- by using `downloads.project_id` instead of `project_id`
|
||||
@@ -226,7 +230,7 @@ const DOWNLOADS: &str = {
|
||||
AND (empty({FILTER_REASON}) OR downloads.reason IN {FILTER_REASON})
|
||||
AND (empty({FILTER_GAME_VERSION}) OR downloads.game_version IN {FILTER_GAME_VERSION})
|
||||
AND (empty({FILTER_LOADER}) OR downloads.loader IN {FILTER_LOADER})
|
||||
GROUP BY bucket, project_id, domain, user_agent, version_id, monetized, country, reason, game_version, loader"
|
||||
GROUP BY bucket, source_project_id, project_id, domain, user_agent, version_id, monetized, country, reason, game_version, loader"
|
||||
)
|
||||
};
|
||||
|
||||
@@ -351,7 +355,13 @@ pub(crate) async fn fetch(
|
||||
},
|
||||
game_version: uses_column("use_game_version")
|
||||
.then(|| row.game_version.clone()),
|
||||
loader: uses_column("use_loader").then(|| row.loader.clone()),
|
||||
loader: uses_column("use_loader").then(|| {
|
||||
normalize_loader_for_project(
|
||||
row.loader.clone(),
|
||||
row.source_project_id,
|
||||
cx.project_loaders,
|
||||
)
|
||||
}),
|
||||
};
|
||||
|
||||
*buckets.entry(key).or_default() += row.downloads;
|
||||
|
||||
@@ -12,6 +12,7 @@ use crate::{
|
||||
use super::super::{
|
||||
ClickhouseFilterParam, QueryClickhouseContext, add_to_time_slice,
|
||||
condense_country, none_if_empty, none_if_zero_version_id,
|
||||
normalize_loader_for_project,
|
||||
};
|
||||
use super::{AnalyticsData, Metrics, ProjectAnalytics, ProjectMetrics};
|
||||
|
||||
@@ -79,6 +80,7 @@ pub struct ProjectPlaytime {
|
||||
#[derive(Debug, clickhouse::Row, serde::Deserialize)]
|
||||
struct PlaytimeRow {
|
||||
bucket: u64,
|
||||
source_project_id: DBProjectId,
|
||||
project_id: DBProjectId,
|
||||
parent_version_id: DBVersionId,
|
||||
version_id: DBVersionId,
|
||||
@@ -103,6 +105,7 @@ const PLAYTIME: &str = {
|
||||
formatcp!(
|
||||
"SELECT
|
||||
bucket,
|
||||
source_project_id,
|
||||
if({USE_PROJECT_ID}, source_project_id, 0) AS project_id,
|
||||
parent_version_id,
|
||||
version_id,
|
||||
@@ -122,7 +125,8 @@ const PLAYTIME: &str = {
|
||||
seconds
|
||||
FROM playtime
|
||||
WHERE
|
||||
recorded BETWEEN {TIME_RANGE_START} AND {TIME_RANGE_END}
|
||||
recorded >= {TIME_RANGE_START}
|
||||
AND recorded < {TIME_RANGE_END}
|
||||
AND playtime.project_id IN {PROJECT_IDS}
|
||||
AND (empty({FILTER_VERSION_ID}) OR playtime.version_id IN {FILTER_VERSION_ID})
|
||||
AND (empty({FILTER_LOADER}) OR playtime.loader IN {FILTER_LOADER})
|
||||
@@ -142,14 +146,15 @@ const PLAYTIME: &str = {
|
||||
seconds
|
||||
FROM playtime
|
||||
WHERE
|
||||
recorded BETWEEN {TIME_RANGE_START} AND {TIME_RANGE_END}
|
||||
recorded >= {TIME_RANGE_START}
|
||||
AND recorded < {TIME_RANGE_END}
|
||||
AND parent IN {PARENT_VERSION_IDS}
|
||||
AND (empty({FILTER_VERSION_ID}) OR playtime.version_id IN {FILTER_VERSION_ID})
|
||||
AND (empty({FILTER_LOADER}) OR playtime.loader IN {FILTER_LOADER})
|
||||
AND (empty({FILTER_GAME_VERSION}) OR playtime.game_version IN {FILTER_GAME_VERSION})
|
||||
AND (empty({FILTER_COUNTRY}) OR playtime.country IN {FILTER_COUNTRY})
|
||||
)
|
||||
GROUP BY bucket, project_id, parent_version_id, version_id, loader, game_version, country"
|
||||
GROUP BY bucket, source_project_id, project_id, parent_version_id, version_id, loader, game_version, country"
|
||||
)
|
||||
};
|
||||
|
||||
@@ -228,11 +233,25 @@ pub(crate) async fn fetch(
|
||||
} else {
|
||||
row.project_id
|
||||
};
|
||||
let source_project_id = if row.source_project_id.0 == 0 {
|
||||
parent_version_projects
|
||||
.get(&row.parent_version_id)
|
||||
.copied()
|
||||
.unwrap_or(row.source_project_id)
|
||||
} else {
|
||||
row.source_project_id
|
||||
};
|
||||
let key = PlaytimeBucket {
|
||||
bucket: row.bucket,
|
||||
project_id,
|
||||
version_id: uses_column("use_version_id").then_some(row.version_id),
|
||||
loader: uses_column("use_loader").then(|| row.loader.clone()),
|
||||
loader: uses_column("use_loader").then(|| {
|
||||
normalize_loader_for_project(
|
||||
row.loader.clone(),
|
||||
source_project_id,
|
||||
cx.project_loaders,
|
||||
)
|
||||
}),
|
||||
game_version: uses_column("use_game_version")
|
||||
.then(|| row.game_version.clone()),
|
||||
country: uses_column("use_country").then(|| row.country.clone()),
|
||||
|
||||
@@ -57,7 +57,8 @@ pub(crate) async fn fetch(
|
||||
-- for affiliate code revenue, see `affiliate_code_revenue`
|
||||
payouts_values.mod_id IS NOT NULL
|
||||
AND payouts_values.mod_id = ANY($4)
|
||||
AND created BETWEEN $1 AND $2
|
||||
AND created >= $1
|
||||
AND created < $2
|
||||
GROUP BY bucket, mod_id",
|
||||
)
|
||||
.bind(req.time_range.start)
|
||||
|
||||
@@ -103,7 +103,8 @@ const VIEWS: &str = {
|
||||
COUNT(*) AS views
|
||||
FROM views
|
||||
WHERE
|
||||
recorded BETWEEN {TIME_RANGE_START} AND {TIME_RANGE_END}
|
||||
recorded >= {TIME_RANGE_START}
|
||||
AND recorded < {TIME_RANGE_END}
|
||||
-- make sure that the REAL project id is included,
|
||||
-- not the possibly-zero one,
|
||||
-- by using `views.project_id` instead of `project_id`
|
||||
|
||||
@@ -11,7 +11,10 @@ mod facets;
|
||||
mod metrics;
|
||||
mod old;
|
||||
|
||||
use std::{collections::HashMap, num::NonZeroU64};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
num::NonZeroU64,
|
||||
};
|
||||
|
||||
use crate::database::PgPool;
|
||||
use actix_web::{HttpRequest, post, web};
|
||||
@@ -26,6 +29,7 @@ use crate::{
|
||||
},
|
||||
database::{
|
||||
self, DBProject,
|
||||
models::version_item::VersionQueryResult,
|
||||
models::{
|
||||
DBAffiliateCode, DBAffiliateCodeId, DBProjectId, DBUser, DBVersion,
|
||||
DBVersionId,
|
||||
@@ -108,6 +112,7 @@ pub const MIN_RESOLUTION: TimeDelta = TimeDelta::minutes(60);
|
||||
/// Maximum number of [`TimeSlice`]s in a [`GetResponse`], controlled by
|
||||
/// [`TimeRange::resolution`].
|
||||
pub const MAX_TIME_SLICES: usize = 1024;
|
||||
pub(crate) const UNKNOWN_LOADER: &str = "unknown";
|
||||
|
||||
// response
|
||||
|
||||
@@ -277,7 +282,7 @@ pub async fn fetch_analytics(
|
||||
.filter(|version| {
|
||||
visible_version_ids.contains(&version.inner.id)
|
||||
&& version.inner.date_published >= req.time_range.start
|
||||
&& version.inner.date_published <= req.time_range.end
|
||||
&& version.inner.date_published < req.time_range.end
|
||||
})
|
||||
.map(|version| ProjectAnalyticsEvent {
|
||||
project_id: version.inner.project_id.into(),
|
||||
@@ -305,6 +310,7 @@ pub async fn fetch_analytics(
|
||||
.into_iter()
|
||||
.map(|code| code.id)
|
||||
.collect::<Vec<_>>();
|
||||
let project_loaders = project_loader_map(&parent_version_data);
|
||||
|
||||
let mut query_clickhouse_cx = QueryClickhouseContext {
|
||||
clickhouse: &clickhouse,
|
||||
@@ -313,6 +319,7 @@ pub async fn fetch_analytics(
|
||||
project_ids: &project_ids,
|
||||
parent_version_ids: &parent_version_ids,
|
||||
affiliate_code_ids: &affiliate_code_ids,
|
||||
project_loaders: &project_loaders,
|
||||
};
|
||||
|
||||
if let Some(metrics) = &req.return_metrics.project_views {
|
||||
@@ -404,6 +411,41 @@ pub(crate) fn condense_country(country: String, count: u64) -> String {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn project_loader_map(
|
||||
versions: &[VersionQueryResult],
|
||||
) -> HashMap<DBProjectId, HashSet<String>> {
|
||||
let mut loaders = HashMap::<DBProjectId, HashSet<String>>::new();
|
||||
|
||||
for version in versions {
|
||||
loaders
|
||||
.entry(version.inner.project_id)
|
||||
.or_default()
|
||||
.extend(version.loaders.iter().cloned());
|
||||
}
|
||||
|
||||
loaders
|
||||
}
|
||||
|
||||
pub(crate) fn normalize_loader_for_project(
|
||||
loader: String,
|
||||
project_id: DBProjectId,
|
||||
project_loaders: &HashMap<DBProjectId, HashSet<String>>,
|
||||
) -> String {
|
||||
if loader.is_empty() {
|
||||
return loader;
|
||||
}
|
||||
|
||||
let loader_is_valid = project_loaders
|
||||
.get(&project_id)
|
||||
.is_some_and(|loaders| loaders.contains(&loader));
|
||||
|
||||
if loader_is_valid {
|
||||
loader
|
||||
} else {
|
||||
UNKNOWN_LOADER.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch_project_status_change_events(
|
||||
project_ids: &[DBProjectId],
|
||||
time_range: &TimeRange,
|
||||
@@ -423,7 +465,8 @@ async fn fetch_project_status_change_events(
|
||||
WHERE
|
||||
t.mod_id = ANY($1)
|
||||
AND tm.body->>'type' = 'status_change'
|
||||
AND tm.created BETWEEN $2 AND $3
|
||||
AND tm.created >= $2
|
||||
AND tm.created < $3
|
||||
"#,
|
||||
&project_id_values,
|
||||
time_range.start,
|
||||
@@ -462,6 +505,7 @@ pub(crate) struct QueryClickhouseContext<'a> {
|
||||
pub(crate) project_ids: &'a [DBProjectId],
|
||||
pub(crate) parent_version_ids: &'a [DBVersionId],
|
||||
pub(crate) affiliate_code_ids: &'a [DBAffiliateCodeId],
|
||||
pub(crate) project_loaders: &'a HashMap<DBProjectId, HashSet<String>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default)]
|
||||
@@ -600,6 +644,9 @@ pub(crate) fn add_to_time_slice(
|
||||
bucket: usize,
|
||||
data: AnalyticsData,
|
||||
) -> Result<(), ApiError> {
|
||||
// Bucketed analytics queries must filter time ranges as `[start, end)`.
|
||||
// `widthBucket` returns `num_time_slices + 1` for values at or after
|
||||
// `end`, which is outside the response slice array.
|
||||
// row.recorded < time_range_start => bucket = 0
|
||||
// row.recorded >= time_range_end => bucket = num_time_slices
|
||||
// (note: this is out of range of `time_slices`!)
|
||||
|
||||
@@ -337,7 +337,7 @@ pub async fn revenue_get(
|
||||
"
|
||||
SELECT mod_id, SUM(amount) amount_sum, DATE_BIN($4::interval, created, TIMESTAMP '2001-01-01') AS interval_start
|
||||
FROM payouts_values
|
||||
WHERE user_id = $1 AND created BETWEEN $2 AND $3
|
||||
WHERE user_id = $1 AND created >= $2 AND created < $3
|
||||
GROUP by mod_id, interval_start ORDER BY interval_start
|
||||
",
|
||||
user.id.0 as i64,
|
||||
@@ -356,7 +356,7 @@ pub async fn revenue_get(
|
||||
"
|
||||
SELECT mod_id, SUM(amount) amount_sum, DATE_BIN($4::interval, created, TIMESTAMP '2001-01-01') AS interval_start
|
||||
FROM payouts_values
|
||||
WHERE mod_id = ANY($1) AND created BETWEEN $2 AND $3
|
||||
WHERE mod_id = ANY($1) AND created >= $2 AND created < $3
|
||||
GROUP by mod_id, interval_start ORDER BY interval_start
|
||||
",
|
||||
&project_ids.iter().map(|x| x.0 as i64).collect::<Vec<_>>(),
|
||||
|
||||
Reference in New Issue
Block a user