Fix clickhouse URI, country filtering (#6247)

* Switch to bind for long params

* Country filtering

* prepare

* playtime preservation
This commit is contained in:
aecsocket
2026-06-01 17:11:44 +01:00
committed by GitHub
parent 1550dfb3f0
commit c3a58aba9e
5 changed files with 265 additions and 158 deletions
@@ -52,11 +52,11 @@ struct AffiliateCodeClickRow {
const AFFILIATE_CODE_CLICKS: &str = {
const USE_AFFILIATE_CODE_ID: &str = "{use_affiliate_code_id: Bool}";
const FILTER_AFFILIATE_CODE_ID: &str =
"{filter_affiliate_code_id: Array(UInt64)}";
const FILTER_AFFILIATE_CODE_ID: &str = "filter_affiliate_code_id";
formatcp!(
"SELECT
"WITH ? AS {FILTER_AFFILIATE_CODE_ID}
SELECT
widthBucket(toUnixTimestamp(recorded), {TIME_RANGE_START}, {TIME_RANGE_END}, {TIME_SLICES}) AS bucket,
if({USE_AFFILIATE_CODE_ID}, affiliate_code_id, 0) AS affiliate_code_id,
COUNT(*) AS clicks
@@ -85,7 +85,6 @@ pub(crate) async fn fetch(
ClickhouseQueryParams::empty(),
&[("use_affiliate_code_id", uses(F::AffiliateCodeId))],
vec![ClickhouseFilterParam::AffiliateCodeId(
"filter_affiliate_code_id",
&metrics.filter_by.affiliate_code_id,
)],
|_| true,
@@ -18,16 +18,16 @@ use crate::{
};
use super::super::{
ClickhouseFilterParam, QueryClickhouseContext, add_to_time_slice,
condense_country, none_if_empty, none_if_zero_version_id,
normalize_loader_for_project,
COUNTRY_PRIVACY_FLOOR, ClickhouseFilterParam, QueryClickhouseContext,
add_to_time_slice, apply_country_privacy, none_if_empty,
none_if_zero_version_id, normalize_loader_for_project,
};
use super::{AnalyticsData, Metrics, ProjectAnalytics, ProjectMetrics};
const TIME_RANGE_START: &str = "{time_range_start: UInt64}";
const TIME_RANGE_END: &str = "{time_range_end: UInt64}";
const TIME_SLICES: &str = "{time_slices: UInt64}";
const PROJECT_IDS: &str = "{project_ids: Array(UInt64)}";
const PROJECT_IDS: &str = "project_ids";
/// Fields for [`super::ReturnMetrics::project_downloads`].
#[derive(
@@ -193,16 +193,24 @@ const DOWNLOADS: &str = {
const USE_REASON: &str = "{use_reason: Bool}";
const USE_GAME_VERSION: &str = "{use_game_version: Bool}";
const USE_LOADER: &str = "{use_loader: Bool}";
const FILTER_DOMAIN: &str = "{filter_domain: Array(String)}";
const FILTER_VERSION_ID: &str = "{filter_version_id: Array(UInt64)}";
const FILTER_DOMAIN: &str = "filter_domain";
const FILTER_VERSION_ID: &str = "filter_version_id";
const FILTER_MONETIZED: &str = "{filter_monetized: UInt8}";
const FILTER_COUNTRY: &str = "{filter_country: Array(String)}";
const FILTER_REASON: &str = "{filter_reason: Array(String)}";
const FILTER_GAME_VERSION: &str = "{filter_game_version: Array(String)}";
const FILTER_LOADER: &str = "{filter_loader: Array(String)}";
const FILTER_COUNTRY: &str = "filter_country";
const FILTER_REASON: &str = "filter_reason";
const FILTER_GAME_VERSION: &str = "filter_game_version";
const FILTER_LOADER: &str = "filter_loader";
formatcp!(
"SELECT
"WITH
? AS {PROJECT_IDS},
? AS {FILTER_DOMAIN},
? AS {FILTER_VERSION_ID},
? AS {FILTER_COUNTRY},
? AS {FILTER_REASON},
? AS {FILTER_GAME_VERSION},
? AS {FILTER_LOADER}
SELECT
widthBucket(toUnixTimestamp(recorded), {TIME_RANGE_START}, {TIME_RANGE_END}, {TIME_SLICES}) AS bucket,
downloads.project_id AS source_project_id,
if({USE_PROJECT_ID}, downloads.project_id, 0) AS project_id,
@@ -275,39 +283,21 @@ pub(crate) async fn fetch(
.param("time_range_start", cx.req.time_range.start.timestamp())
.param("time_range_end", cx.req.time_range.end.timestamp())
.param("time_slices", cx.time_slices.len())
.param("project_ids", cx.project_ids);
.bind(cx.project_ids);
for (param_name, used) in use_columns {
query = query.param(param_name, used)
}
for filter_param in [
ClickhouseFilterParam::String(
"filter_domain",
&metrics.filter_by.domain,
),
ClickhouseFilterParam::VersionId(
"filter_version_id",
&metrics.filter_by.version_id,
),
ClickhouseFilterParam::String(&metrics.filter_by.domain),
ClickhouseFilterParam::VersionId(&metrics.filter_by.version_id),
ClickhouseFilterParam::Bool(
"filter_monetized",
&metrics.filter_by.monetized,
),
ClickhouseFilterParam::String(
"filter_country",
&metrics.filter_by.country,
),
ClickhouseFilterParam::DownloadReason(
"filter_reason",
&metrics.filter_by.reason,
),
ClickhouseFilterParam::String(
"filter_game_version",
&metrics.filter_by.game_version,
),
ClickhouseFilterParam::String(
"filter_loader",
&metrics.filter_by.loader,
),
ClickhouseFilterParam::String(&metrics.filter_by.country),
ClickhouseFilterParam::DownloadReason(&metrics.filter_by.reason),
ClickhouseFilterParam::String(&metrics.filter_by.game_version),
ClickhouseFilterParam::String(&metrics.filter_by.loader),
] {
query = filter_param.bind(query);
}
@@ -367,7 +357,20 @@ pub(crate) async fn fetch(
*buckets.entry(key).or_default() += row.downloads;
}
for (key, downloads) in buckets {
let mut output_buckets = HashMap::<DownloadBucket, u64>::new();
for (mut key, downloads) in buckets {
if !apply_country_privacy(
&mut key.country,
!metrics.filter_by.country.is_empty(),
downloads,
COUNTRY_PRIVACY_FLOOR,
) {
continue;
}
*output_buckets.entry(key).or_default() += downloads;
}
for (key, downloads) in output_buckets {
add_to_time_slice(
cx.time_slices,
key.bucket as usize,
@@ -380,9 +383,7 @@ pub(crate) async fn fetch(
.version_id
.and_then(none_if_zero_version_id),
monetized: key.monetized,
country: key
.country
.map(|country| condense_country(country, downloads)),
country: key.country,
reason: key.reason,
game_version: key.game_version.and_then(none_if_empty),
loader: key.loader.and_then(none_if_empty),
@@ -10,16 +10,16 @@ use crate::{
};
use super::super::{
ClickhouseFilterParam, QueryClickhouseContext, add_to_time_slice,
condense_country, none_if_empty, none_if_zero_version_id,
normalize_loader_for_project,
COUNTRY_PLAYTIME_PRIVACY_FLOOR_SECONDS, ClickhouseFilterParam,
QueryClickhouseContext, add_to_time_slice, apply_country_privacy,
none_if_empty, none_if_zero_version_id, normalize_loader_for_project,
};
use super::{AnalyticsData, Metrics, ProjectAnalytics, ProjectMetrics};
const TIME_RANGE_START: &str = "{time_range_start: UInt64}";
const TIME_RANGE_END: &str = "{time_range_end: UInt64}";
const TIME_SLICES: &str = "{time_slices: UInt64}";
const PROJECT_IDS: &str = "{project_ids: Array(UInt64)}";
const PROJECT_IDS: &str = "project_ids";
/// Fields for [`super::ReturnMetrics::project_playtime`].
#[derive(
@@ -96,14 +96,21 @@ const PLAYTIME: &str = {
const USE_LOADER: &str = "{use_loader: Bool}";
const USE_GAME_VERSION: &str = "{use_game_version: Bool}";
const USE_COUNTRY: &str = "{use_country: Bool}";
const PARENT_VERSION_IDS: &str = "{parent_version_ids: Array(UInt64)}";
const FILTER_VERSION_ID: &str = "{filter_version_id: Array(UInt64)}";
const FILTER_LOADER: &str = "{filter_loader: Array(String)}";
const FILTER_GAME_VERSION: &str = "{filter_game_version: Array(String)}";
const FILTER_COUNTRY: &str = "{filter_country: Array(String)}";
const PARENT_VERSION_IDS: &str = "parent_version_ids";
const FILTER_VERSION_ID: &str = "filter_version_id";
const FILTER_LOADER: &str = "filter_loader";
const FILTER_GAME_VERSION: &str = "filter_game_version";
const FILTER_COUNTRY: &str = "filter_country";
formatcp!(
"SELECT
"WITH
? AS {PROJECT_IDS},
? AS {PARENT_VERSION_IDS},
? AS {FILTER_VERSION_ID},
? AS {FILTER_LOADER},
? AS {FILTER_GAME_VERSION},
? AS {FILTER_COUNTRY}
SELECT
bucket,
source_project_id,
if({USE_PROJECT_ID}, source_project_id, 0) AS project_id,
@@ -194,28 +201,16 @@ pub(crate) async fn fetch(
.param("time_range_start", cx.req.time_range.start.timestamp())
.param("time_range_end", cx.req.time_range.end.timestamp())
.param("time_slices", cx.time_slices.len())
.param("project_ids", cx.project_ids)
.param("parent_version_ids", cx.parent_version_ids);
.bind(cx.project_ids)
.bind(cx.parent_version_ids);
for (param_name, used) in use_columns {
query = query.param(param_name, used)
}
for filter_param in [
ClickhouseFilterParam::VersionId(
"filter_version_id",
&metrics.filter_by.version_id,
),
ClickhouseFilterParam::String(
"filter_loader",
&metrics.filter_by.loader,
),
ClickhouseFilterParam::String(
"filter_game_version",
&metrics.filter_by.game_version,
),
ClickhouseFilterParam::String(
"filter_country",
&metrics.filter_by.country,
),
ClickhouseFilterParam::VersionId(&metrics.filter_by.version_id),
ClickhouseFilterParam::String(&metrics.filter_by.loader),
ClickhouseFilterParam::String(&metrics.filter_by.game_version),
ClickhouseFilterParam::String(&metrics.filter_by.country),
] {
query = filter_param.bind(query);
}
@@ -260,7 +255,20 @@ pub(crate) async fn fetch(
*buckets.entry(key).or_default() += row.seconds;
}
for (key, seconds) in buckets {
let mut output_buckets = HashMap::<PlaytimeBucket, u64>::new();
for (mut key, seconds) in buckets {
if !apply_country_privacy(
&mut key.country,
!metrics.filter_by.country.is_empty(),
seconds,
COUNTRY_PLAYTIME_PRIVACY_FLOOR_SECONDS,
) {
continue;
}
*output_buckets.entry(key).or_default() += seconds;
}
for (key, seconds) in output_buckets {
add_to_time_slice(
cx.time_slices,
key.bucket as usize,
@@ -272,9 +280,7 @@ pub(crate) async fn fetch(
.and_then(none_if_zero_version_id),
loader: key.loader.and_then(none_if_empty),
game_version: key.game_version.and_then(none_if_empty),
country: key
.country
.map(|country| condense_country(country, seconds)),
country: key.country,
seconds,
}),
}),
@@ -1,18 +1,20 @@
use std::collections::HashMap;
use const_format::formatcp;
use serde::{Deserialize, Serialize};
use crate::{database::models::DBProjectId, routes::ApiError};
use super::super::{
ClickhouseFilterParam, ClickhouseQueryParams, QueryClickhouseContext,
condense_country, none_if_empty, query_clickhouse,
COUNTRY_PRIVACY_FLOOR, ClickhouseFilterParam, QueryClickhouseContext,
add_to_time_slice, apply_country_privacy, none_if_empty,
};
use super::{AnalyticsData, Metrics, ProjectAnalytics, ProjectMetrics};
const TIME_RANGE_START: &str = "{time_range_start: UInt64}";
const TIME_RANGE_END: &str = "{time_range_end: UInt64}";
const TIME_SLICES: &str = "{time_slices: UInt64}";
const PROJECT_IDS: &str = "{project_ids: Array(UInt64)}";
const PROJECT_IDS: &str = "project_ids";
/// Fields for [`super::ReturnMetrics::project_views`].
#[derive(
@@ -81,19 +83,34 @@ struct ViewRow {
views: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct ViewBucket {
bucket: u64,
project_id: DBProjectId,
domain: Option<String>,
site_path: Option<String>,
monetized: Option<bool>,
country: Option<String>,
}
const VIEWS: &str = {
const USE_PROJECT_ID: &str = "{use_project_id: Bool}";
const USE_DOMAIN: &str = "{use_domain: Bool}";
const USE_SITE_PATH: &str = "{use_site_path: Bool}";
const USE_MONETIZED: &str = "{use_monetized: Bool}";
const USE_COUNTRY: &str = "{use_country: Bool}";
const FILTER_DOMAIN: &str = "{filter_domain: Array(String)}";
const FILTER_SITE_PATH: &str = "{filter_site_path: Array(String)}";
const FILTER_DOMAIN: &str = "filter_domain";
const FILTER_SITE_PATH: &str = "filter_site_path";
const FILTER_MONETIZED: &str = "{filter_monetized: UInt8}";
const FILTER_COUNTRY: &str = "{filter_country: Array(String)}";
const FILTER_COUNTRY: &str = "filter_country";
formatcp!(
"SELECT
"WITH
? AS {PROJECT_IDS},
? AS {FILTER_DOMAIN},
? AS {FILTER_SITE_PATH},
? AS {FILTER_COUNTRY}
SELECT
widthBucket(toUnixTimestamp(recorded), {TIME_RANGE_START}, {TIME_RANGE_END}, {TIME_SLICES}) AS bucket,
if({USE_PROJECT_ID}, project_id, 0) AS project_id,
if({USE_DOMAIN}, domain, '') AS domain,
@@ -124,59 +141,95 @@ pub(crate) async fn fetch(
) -> Result<(), ApiError> {
use ProjectViewsField as F;
let uses = |field| metrics.bucket_by.contains(&field);
let use_columns = &[
("use_project_id", uses(F::ProjectId)),
("use_domain", uses(F::Domain)),
("use_site_path", uses(F::SitePath)),
("use_monetized", uses(F::Monetized)),
("use_country", uses(F::Country)),
];
let uses_column = |name| {
use_columns
.iter()
.any(|(column_name, used)| *column_name == name && *used)
};
query_clickhouse::<ViewRow>(
cx,
VIEWS,
ClickhouseQueryParams::PROJECT_IDS,
&[
("use_project_id", uses(F::ProjectId)),
("use_domain", uses(F::Domain)),
("use_site_path", uses(F::SitePath)),
("use_monetized", uses(F::Monetized)),
("use_country", uses(F::Country)),
],
vec![
ClickhouseFilterParam::String(
"filter_domain",
&metrics.filter_by.domain,
),
ClickhouseFilterParam::String(
"filter_site_path",
&metrics.filter_by.site_path,
),
ClickhouseFilterParam::Bool(
"filter_monetized",
&metrics.filter_by.monetized,
),
ClickhouseFilterParam::String(
"filter_country",
&metrics.filter_by.country,
),
],
|_| true,
|row| row.bucket,
|row| {
let country = if uses(F::Country) {
Some(condense_country(row.country, row.views))
let mut query = cx
.clickhouse
.query(VIEWS)
.param("time_range_start", cx.req.time_range.start.timestamp())
.param("time_range_end", cx.req.time_range.end.timestamp())
.param("time_slices", cx.time_slices.len())
.bind(cx.project_ids);
for (param_name, used) in use_columns {
query = query.param(param_name, used)
}
for filter_param in [
ClickhouseFilterParam::String(&metrics.filter_by.domain),
ClickhouseFilterParam::String(&metrics.filter_by.site_path),
ClickhouseFilterParam::Bool(
"filter_monetized",
&metrics.filter_by.monetized,
),
ClickhouseFilterParam::String(&metrics.filter_by.country),
] {
query = filter_param.bind(query);
}
let mut cursor = query.fetch::<ViewRow>()?;
let mut buckets = HashMap::<ViewBucket, u64>::new();
while let Some(row) = cursor.next().await? {
let key = ViewBucket {
bucket: row.bucket,
project_id: row.project_id,
domain: uses_column("use_domain").then(|| row.domain.clone()),
site_path: uses_column("use_site_path")
.then(|| row.site_path.clone()),
monetized: if uses_column("use_monetized") {
match row.monetized {
0 => Some(false),
1 => Some(true),
_ => None,
}
} else {
None
};
},
country: uses_column("use_country").then(|| row.country.clone()),
};
*buckets.entry(key).or_default() += row.views;
}
let mut output_buckets = HashMap::<ViewBucket, u64>::new();
for (mut key, views) in buckets {
if !apply_country_privacy(
&mut key.country,
!metrics.filter_by.country.is_empty(),
views,
COUNTRY_PRIVACY_FLOOR,
) {
continue;
}
*output_buckets.entry(key).or_default() += views;
}
for (key, views) in output_buckets {
add_to_time_slice(
cx.time_slices,
key.bucket as usize,
AnalyticsData::Project(ProjectAnalytics {
source_project: row.project_id.into(),
source_project: key.project_id.into(),
metrics: ProjectMetrics::Views(ProjectViews {
domain: none_if_empty(row.domain),
site_path: none_if_empty(row.site_path),
monetized: match row.monetized {
0 => Some(false),
1 => Some(true),
_ => None,
},
country,
views: row.views,
domain: key.domain.and_then(none_if_empty),
site_path: key.site_path.and_then(none_if_empty),
monetized: key.monetized,
country: key.country,
views,
}),
})
},
)
.await
}),
)?;
}
Ok(())
}
@@ -114,6 +114,9 @@ pub const MIN_RESOLUTION: TimeDelta = TimeDelta::minutes(60);
/// [`TimeRange::resolution`].
pub const MAX_TIME_SLICES: usize = 1024;
pub(crate) const UNKNOWN_LOADER: &str = "unknown";
pub(crate) const UNKNOWN_COUNTRY: &str = "XX";
pub(crate) const COUNTRY_PRIVACY_FLOOR: u64 = 50;
pub(crate) const COUNTRY_PLAYTIME_PRIVACY_FLOOR_SECONDS: u64 = 4 * 60 * 60;
// response
@@ -403,13 +406,25 @@ pub(crate) fn none_if_zero_version_id(v: DBVersionId) -> Option<VersionId> {
if v.0 == 0 { None } else { Some(v.into()) }
}
pub(crate) fn condense_country(country: String, count: u64) -> String {
// Every country under '50' (view or downloads) should be condensed into 'XX'
if count < 50 {
"XX".to_string()
} else {
country
pub(crate) fn apply_country_privacy(
country: &mut Option<String>,
country_filter_applied: bool,
count: u64,
floor: u64,
) -> bool {
if count >= floor {
return true;
}
if country_filter_applied {
return false;
}
if country.is_some() {
*country = Some(UNKNOWN_COUNTRY.to_string());
}
true
}
pub(crate) fn project_loader_map(
@@ -517,11 +532,11 @@ pub(crate) struct ClickhouseQueryParams {
}
pub(crate) enum ClickhouseFilterParam<'a> {
String(&'static str, &'a [String]),
String(&'a [String]),
Bool(&'static str, &'a [bool]),
VersionId(&'static str, &'a [VersionId]),
AffiliateCodeId(&'static str, &'a [AffiliateCodeId]),
DownloadReason(&'static str, &'a [DownloadReason]),
VersionId(&'a [VersionId]),
AffiliateCodeId(&'a [AffiliateCodeId]),
DownloadReason(&'a [DownloadReason]),
}
impl ClickhouseFilterParam<'_> {
@@ -530,7 +545,7 @@ impl ClickhouseFilterParam<'_> {
query: clickhouse::query::Query,
) -> clickhouse::query::Query {
match self {
Self::String(name, values) => query.param(name, values),
Self::String(values) => query.bind(values),
Self::Bool(name, values) => {
let value = match values {
[false] => 0,
@@ -539,36 +554,30 @@ impl ClickhouseFilterParam<'_> {
};
query.param(name, value)
}
Self::VersionId(name, values) => {
Self::VersionId(values) => {
let values = values
.iter()
.map(|id| DBVersionId::from(*id))
.collect::<Vec<_>>();
query.param(name, values)
query.bind(values)
}
Self::AffiliateCodeId(name, values) => {
Self::AffiliateCodeId(values) => {
let values = values
.iter()
.map(|id| DBAffiliateCodeId::from(*id))
.collect::<Vec<_>>();
query.param(name, values)
query.bind(values)
}
Self::DownloadReason(name, values) => {
Self::DownloadReason(values) => {
let values =
values.iter().map(ToString::to_string).collect::<Vec<_>>();
query.param(name, values)
query.bind(values)
}
}
}
}
impl ClickhouseQueryParams {
pub(crate) const PROJECT_IDS: Self = Self {
project_ids: true,
parent_version_ids: false,
affiliate_code_ids: false,
};
pub(crate) const fn empty() -> Self {
Self {
project_ids: false,
@@ -613,13 +622,13 @@ where
.param("time_range_end", cx.req.time_range.end.timestamp())
.param("time_slices", cx.time_slices.len());
if params.project_ids {
query = query.param("project_ids", cx.project_ids);
query = query.bind(cx.project_ids);
}
if params.parent_version_ids {
query = query.param("parent_version_ids", cx.parent_version_ids);
query = query.bind(cx.parent_version_ids);
}
if params.affiliate_code_ids {
query = query.param("affiliate_code_ids", cx.affiliate_code_ids);
query = query.bind(cx.affiliate_code_ids);
}
for (param_name, used) in use_columns {
query = query.param(param_name, used)
@@ -794,6 +803,45 @@ mod tests {
);
}
#[test]
fn country_privacy_floor_suppresses_small_constrained_buckets() {
let mut country = None;
assert!(apply_country_privacy(
&mut country,
false,
1,
COUNTRY_PRIVACY_FLOOR
));
assert_eq!(country, None);
let mut country = Some("US".into());
assert!(apply_country_privacy(
&mut country,
false,
49,
COUNTRY_PRIVACY_FLOOR
));
assert_eq!(country, Some("XX".into()));
let mut country = Some("US".into());
assert!(!apply_country_privacy(
&mut country,
true,
49,
COUNTRY_PRIVACY_FLOOR
));
assert_eq!(country, Some("US".into()));
let mut country = Some("US".into());
assert!(apply_country_privacy(
&mut country,
true,
50,
COUNTRY_PRIVACY_FLOOR
));
assert_eq!(country, Some("US".into()));
}
#[test]
fn response_format() {
let test_project_1 = ProjectId(123);