Allow revenue analytics to bucket by user ID (#6428)

* adjust

* convert to sqlx macros

* Allow users on a team to see rev splits of other users

* prepare

* clarify comment
This commit is contained in:
aecsocket
2026-06-18 13:12:56 +01:00
committed by GitHub
parent 486b467af2
commit b3257a0614
9 changed files with 255 additions and 60 deletions
@@ -0,0 +1,39 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n WIDTH_BUCKET(\n EXTRACT(EPOCH FROM created)::bigint,\n EXTRACT(EPOCH FROM $1::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n EXTRACT(EPOCH FROM $2::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n $3::integer\n ) AS \"bucket?\",\n CASE WHEN $5 THEN affiliate_code_source ELSE 0 END AS \"affiliate_code_source?\",\n SUM(amount) AS \"amount_sum?\"\n FROM payouts_values\n WHERE\n user_id = $4\n AND payouts_values.affiliate_code_source IS NOT NULL\n AND created >= $1\n AND created < $2\n AND (cardinality($6::bigint[]) = 0 OR affiliate_code_source = ANY($6))\n GROUP BY 1, 2\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "bucket?",
"type_info": "Int4"
},
{
"ordinal": 1,
"name": "affiliate_code_source?",
"type_info": "Int8"
},
{
"ordinal": 2,
"name": "amount_sum?",
"type_info": "Numeric"
}
],
"parameters": {
"Left": [
"Timestamptz",
"Timestamptz",
"Int4",
"Int8",
"Bool",
"Int8Array"
]
},
"nullable": [
null,
null,
null
]
},
"hash": "52431524eac4f9d821fd0ee0ca5acbc88c191dd7660d5ef7cd6376f36f3a49af"
}
@@ -0,0 +1,39 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n WIDTH_BUCKET(\n EXTRACT(EPOCH FROM usa.created_at)::bigint,\n EXTRACT(EPOCH FROM $1::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n EXTRACT(EPOCH FROM $2::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n $3::integer\n ) AS \"bucket?\",\n CASE WHEN $5 THEN affiliate_code ELSE 0 END AS \"affiliate_code?\",\n COUNT(*) AS \"conversions?\"\n FROM users_subscriptions_affiliations usa\n INNER JOIN affiliate_codes ac ON ac.id = usa.affiliate_code\n INNER JOIN users_subscriptions us ON us.id = usa.subscription_id\n INNER JOIN charges c ON c.subscription_id = us.id\n WHERE\n ac.affiliate = $4\n AND usa.created_at >= $1\n AND usa.created_at < $2\n AND c.status = 'succeeded'\n AND (cardinality($6::bigint[]) = 0 OR affiliate_code = ANY($6))\n GROUP BY 1, 2\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "bucket?",
"type_info": "Int4"
},
{
"ordinal": 1,
"name": "affiliate_code?",
"type_info": "Int8"
},
{
"ordinal": 2,
"name": "conversions?",
"type_info": "Int8"
}
],
"parameters": {
"Left": [
"Timestamptz",
"Timestamptz",
"Int4",
"Int8",
"Bool",
"Int8Array"
]
},
"nullable": [
null,
null,
null
]
},
"hash": "8c2bc4b2b4c659091415058357eebd31e3546e74ff948ddbc76484178dc2956c"
}
@@ -0,0 +1,23 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT m.id\n FROM mods m\n INNER JOIN team_members tm ON tm.team_id = m.team_id\n WHERE\n m.id = ANY($1)\n AND tm.user_id = $2\n AND tm.accepted\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Int8"
}
],
"parameters": {
"Left": [
"Int8Array",
"Int8"
]
},
"nullable": [
false
]
},
"hash": "a47f1d4140d9b93d99934c0b2af0ab0ae9119509d6fc72760e5f0390bfe40543"
}
@@ -0,0 +1,46 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n WIDTH_BUCKET(\n EXTRACT(EPOCH FROM created)::bigint,\n EXTRACT(EPOCH FROM $1::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n EXTRACT(EPOCH FROM $2::timestamp with time zone AT TIME ZONE 'UTC')::bigint,\n $3::integer\n ) AS \"bucket?\",\n mod_id AS \"mod_id?\",\n CASE\n WHEN $5 AND ($6 OR mod_id = ANY($7)) THEN user_id\n ELSE 0\n END AS \"user_id?\",\n SUM(amount) AS \"amount_sum?\"\n FROM payouts_values\n WHERE\n -- only project revenue is counted here\n -- for affiliate code revenue, see `affiliate_code_revenue`\n payouts_values.mod_id IS NOT NULL\n AND payouts_values.mod_id = ANY($4)\n AND created >= $1\n AND created < $2\n GROUP BY 1, 2, 3\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "bucket?",
"type_info": "Int4"
},
{
"ordinal": 1,
"name": "mod_id?",
"type_info": "Int8"
},
{
"ordinal": 2,
"name": "user_id?",
"type_info": "Int8"
},
{
"ordinal": 3,
"name": "amount_sum?",
"type_info": "Numeric"
}
],
"parameters": {
"Left": [
"Timestamptz",
"Timestamptz",
"Int4",
"Int8Array",
"Bool",
"Bool",
"Int8Array"
]
},
"nullable": [
null,
true,
null,
null
]
},
"hash": "c2e40b00e2764be89a162feb1f1cc927e873e51a0d3b6e3e142efa3ba9e0e76f"
}
+1
View File
@@ -25,3 +25,4 @@
- `Modrinth-Admin: feedbeef` as admin key
- If some steps require you to create a project/mod or version for testing, ask the user to go into the web frontend and manually create a project/version
- When using `sqlx::query` etc. always use the macro form like `sqlx::query!` or `sqlx::query_scalar!` - never the plain function form. Avoid using `query_as!`.
- Do not run `cargo test`, even for a single specific test, unless explicitly prompted to by the user, since it takes a long time to run.
@@ -1,6 +1,5 @@
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use sqlx::Row;
use crate::{
database::{
@@ -59,16 +58,17 @@ pub(crate) async fn fetch(
.iter()
.map(|id| DBAffiliateCodeId::from(*id).0)
.collect::<Vec<_>>();
let mut rows = sqlx::query(
"SELECT
let mut rows = sqlx::query!(
r#"
SELECT
WIDTH_BUCKET(
EXTRACT(EPOCH FROM usa.created_at)::bigint,
EXTRACT(EPOCH FROM $1::timestamp with time zone AT TIME ZONE 'UTC')::bigint,
EXTRACT(EPOCH FROM $2::timestamp with time zone AT TIME ZONE 'UTC')::bigint,
$3::integer
) AS bucket,
CASE WHEN $5 THEN affiliate_code ELSE 0 END AS affiliate_code,
COUNT(*) AS conversions
) AS "bucket?",
CASE WHEN $5 THEN affiliate_code ELSE 0 END AS "affiliate_code?",
COUNT(*) AS "conversions?"
FROM users_subscriptions_affiliations usa
INNER JOIN affiliate_codes ac ON ac.id = usa.affiliate_code
INNER JOIN users_subscriptions us ON us.id = usa.subscription_id
@@ -79,22 +79,21 @@ pub(crate) async fn fetch(
AND usa.created_at < $2
AND c.status = 'succeeded'
AND (cardinality($6::bigint[]) = 0 OR affiliate_code = ANY($6))
GROUP BY bucket, affiliate_code",
)
.bind(req.time_range.start)
.bind(req.time_range.end)
.bind(num_time_slices as i64)
.bind(user_id as DBUserId)
.bind(
GROUP BY 1, 2
"#,
req.time_range.start,
req.time_range.end,
num_time_slices as i64,
user_id as DBUserId,
metrics
.bucket_by
.contains(&AffiliateCodeConversionsField::AffiliateCodeId),
&filter_affiliate_code_ids,
)
.bind(&filter_affiliate_code_ids)
.fetch(pool);
while let Some(row) = rows.next().await.transpose()? {
let bucket = row
.try_get::<Option<i32>, _>("bucket")?
.bucket
.wrap_internal_err("bucket should be non-null - query bug!")?;
let bucket = usize::try_from(bucket).wrap_internal_err_with(|| {
eyre::eyre!(
@@ -102,12 +101,10 @@ pub(crate) async fn fetch(
)
})?;
let affiliate_code = row.try_get::<Option<i64>, _>("affiliate_code")?;
let conversion_count = row.try_get::<Option<i64>, _>("conversions")?;
let source_affiliate_code = AffiliateCodeId::from(DBAffiliateCodeId(
affiliate_code.unwrap_or_default(),
row.affiliate_code.unwrap_or_default(),
));
let conversions = u64::try_from(conversion_count.unwrap_or_default())
let conversions = u64::try_from(row.conversions.unwrap_or_default())
.unwrap_or(u64::MAX);
add_to_time_slice(
@@ -1,7 +1,6 @@
use futures::StreamExt;
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use sqlx::Row;
use crate::{
database::{
@@ -57,16 +56,17 @@ pub(crate) async fn fetch(
.iter()
.map(|id| DBAffiliateCodeId::from(*id).0)
.collect::<Vec<_>>();
let mut rows = sqlx::query(
"SELECT
let mut rows = sqlx::query!(
r#"
SELECT
WIDTH_BUCKET(
EXTRACT(EPOCH FROM created)::bigint,
EXTRACT(EPOCH FROM $1::timestamp with time zone AT TIME ZONE 'UTC')::bigint,
EXTRACT(EPOCH FROM $2::timestamp with time zone AT TIME ZONE 'UTC')::bigint,
$3::integer
) AS bucket,
CASE WHEN $5 THEN affiliate_code_source ELSE 0 END AS affiliate_code_source,
SUM(amount) amount_sum
) AS "bucket?",
CASE WHEN $5 THEN affiliate_code_source ELSE 0 END AS "affiliate_code_source?",
SUM(amount) AS "amount_sum?"
FROM payouts_values
WHERE
user_id = $4
@@ -74,22 +74,21 @@ pub(crate) async fn fetch(
AND created >= $1
AND created < $2
AND (cardinality($6::bigint[]) = 0 OR affiliate_code_source = ANY($6))
GROUP BY bucket, affiliate_code_source",
)
.bind(req.time_range.start)
.bind(req.time_range.end)
.bind(num_time_slices as i64)
.bind(user_id as DBUserId)
.bind(
GROUP BY 1, 2
"#,
req.time_range.start,
req.time_range.end,
num_time_slices as i64,
user_id as DBUserId,
metrics
.bucket_by
.contains(&AffiliateCodeRevenueField::AffiliateCodeId),
&filter_affiliate_code_ids,
)
.bind(&filter_affiliate_code_ids)
.fetch(pool);
while let Some(row) = rows.next().await.transpose()? {
let bucket = row
.try_get::<Option<i32>, _>("bucket")?
.bucket
.wrap_internal_err("bucket should be non-null - query bug!")?;
let bucket = usize::try_from(bucket).wrap_internal_err_with(|| {
eyre::eyre!(
@@ -97,14 +96,10 @@ pub(crate) async fn fetch(
)
})?;
let affiliate_code_source =
row.try_get::<Option<i64>, _>("affiliate_code_source")?;
let source_affiliate_code = AffiliateCodeId::from(DBAffiliateCodeId(
affiliate_code_source.unwrap_or_default(),
row.affiliate_code_source.unwrap_or_default(),
));
let revenue = row
.try_get::<Option<Decimal>, _>("amount_sum")?
.unwrap_or_default();
let revenue = row.amount_sum.unwrap_or_default();
add_to_time_slice(
time_slices,
@@ -1,17 +1,21 @@
use futures::StreamExt;
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use sqlx::Row;
use ariadne::ids::UserId;
use crate::{
database::{PgPool, models::DBProjectId},
database::{
PgPool,
models::{DBProjectId, DBUserId},
},
models::ids::ProjectId,
routes::ApiError,
util::error::Context,
};
use super::super::{TimeSlice, add_to_time_slice};
use super::{AnalyticsData, ProjectAnalytics, ProjectMetrics};
use super::{AnalyticsData, Metrics, ProjectAnalytics, ProjectMetrics};
/// Fields for [`super::ReturnMetrics::project_revenue`].
#[derive(
@@ -21,6 +25,13 @@ use super::{AnalyticsData, ProjectAnalytics, ProjectMetrics};
pub enum ProjectRevenueField {
/// Project ID.
ProjectId,
/// User ID.
///
/// You can only bucket by user if you are a member on the project.
/// If you are a member of the parent organization (and have view analytics
/// permissions), but not a member of the project, you cannot bucket by
/// user.
UserId,
}
/// Filters for [`super::ReturnMetrics::project_revenue`].
@@ -30,6 +41,9 @@ pub struct ProjectRevenueFilters {}
/// [`super::ReturnMetrics::project_revenue`].
#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)]
pub struct ProjectRevenue {
/// User these metrics are for.
#[serde(skip_serializing_if = "Option::is_none")]
pub user_id: Option<UserId>,
/// Total revenue for this bucket.
pub(crate) revenue: Decimal,
}
@@ -40,17 +54,28 @@ pub(crate) async fn fetch(
req: &super::super::GetRequest,
num_time_slices: usize,
project_id_values: &[i64],
user_id_bucket_project_ids: &[i64],
can_view_all_revenue_splits: bool,
metrics: &Metrics<ProjectRevenueField, ProjectRevenueFilters>,
) -> Result<(), ApiError> {
let mut rows = sqlx::query(
"SELECT
let bucket_by_user_id =
metrics.bucket_by.contains(&ProjectRevenueField::UserId);
let mut rows = sqlx::query!(
r#"
SELECT
WIDTH_BUCKET(
EXTRACT(EPOCH FROM created)::bigint,
EXTRACT(EPOCH FROM $1::timestamp with time zone AT TIME ZONE 'UTC')::bigint,
EXTRACT(EPOCH FROM $2::timestamp with time zone AT TIME ZONE 'UTC')::bigint,
$3::integer
) AS bucket,
mod_id,
SUM(amount) amount_sum
) AS "bucket?",
mod_id AS "mod_id?",
CASE
WHEN $5 AND ($6 OR mod_id = ANY($7)) THEN user_id
ELSE 0
END AS "user_id?",
SUM(amount) AS "amount_sum?"
FROM payouts_values
WHERE
-- only project revenue is counted here
@@ -59,16 +84,20 @@ pub(crate) async fn fetch(
AND payouts_values.mod_id = ANY($4)
AND created >= $1
AND created < $2
GROUP BY bucket, mod_id",
GROUP BY 1, 2, 3
"#,
req.time_range.start,
req.time_range.end,
num_time_slices as i64,
project_id_values,
bucket_by_user_id,
can_view_all_revenue_splits,
user_id_bucket_project_ids,
)
.bind(req.time_range.start)
.bind(req.time_range.end)
.bind(num_time_slices as i64)
.bind(project_id_values)
.fetch(pool);
while let Some(row) = rows.next().await.transpose()? {
let bucket = row
.try_get::<Option<i32>, _>("bucket")?
.bucket
.wrap_internal_err("bucket should be non-null - query bug!")?;
let bucket = usize::try_from(bucket).wrap_internal_err_with(|| {
eyre::eyre!(
@@ -76,11 +105,9 @@ pub(crate) async fn fetch(
)
})?;
let mod_id = row.try_get::<Option<i64>, _>("mod_id")?;
let amount_sum = row.try_get::<Option<Decimal>, _>("amount_sum")?;
if let Some(source_project) =
mod_id.map(DBProjectId).map(ProjectId::from)
&& let Some(revenue) = amount_sum
row.mod_id.map(DBProjectId).map(ProjectId::from)
&& let Some(revenue) = row.amount_sum
{
add_to_time_slice(
time_slices,
@@ -88,6 +115,11 @@ pub(crate) async fn fetch(
AnalyticsData::Project(ProjectAnalytics {
source_project,
metrics: ProjectMetrics::Revenue(ProjectRevenue {
user_id: row
.user_id
.filter(|id| bucket_by_user_id && *id != 0)
.map(DBUserId)
.map(UserId::from),
revenue,
}),
}),
@@ -16,7 +16,7 @@ use std::{
num::NonZeroU64,
};
use crate::database::PgPool;
use crate::database::{PgPool, models::DBUserId};
use actix_web::{HttpRequest, post, web};
use chrono::{DateTime, TimeDelta, Utc};
use eyre::eyre;
@@ -355,17 +355,39 @@ pub async fn fetch_analytics(
.await?;
}
if req.return_metrics.project_revenue.is_some() {
if let Some(metrics) = &req.return_metrics.project_revenue {
if !scopes.contains(Scopes::PAYOUTS_READ) {
return Err(AuthenticationError::InvalidCredentials.into());
}
let user_id_bucket_project_ids = sqlx::query!(
"
SELECT m.id
FROM mods m
INNER JOIN team_members tm ON tm.team_id = m.team_id
WHERE
m.id = ANY($1)
AND tm.user_id = $2
AND tm.accepted
",
&project_id_values,
DBUserId::from(user.id).0,
)
.fetch_all(&**pool)
.await?
.into_iter()
.map(|row| row.id)
.collect::<Vec<_>>();
metrics::fetch_project_revenue(
&pool,
&mut time_slices,
&req,
num_time_slices,
&project_id_values,
&user_id_bucket_project_ids,
user.role.is_mod(),
metrics,
)
.await?;
}
@@ -965,6 +987,7 @@ mod tests {
TimeSlice(vec![AnalyticsData::Project(ProjectAnalytics {
source_project: test_project_3,
metrics: ProjectMetrics::Revenue(ProjectRevenue {
user_id: None,
revenue: Decimal::new(20000, 2),
}),
})]),