You've already forked AstralRinth
forked from didirus/AstralRinth
Offers, redemption, preview subscriptions (#4121)
* Initial db migration/impl, guarded partner routes * Add guard to /redeem * Add `public` column to products prices, only expose public prices * Query cache * Add partner subscription type * 5 days subscription interval, metadata * Create server on redeem * Query cache * Fix race condition * Unprovision Medal subscriptions * Consider due expiring charge as unprovisionable * Query cache * Use a queue * Promote to full subscription, fmt + clippy * Patch expiring charge on promotion, comments * Additional comments * Add `tags` field to Archon /create request * Address review comments * Query cache * Final fixes to edit_subscription * Appease clippy * fmt
This commit is contained in:
committed by
GitHub
parent
c02b809601
commit
9497ba70a4
@@ -197,7 +197,7 @@ impl DBCharge {
|
||||
) -> Result<Option<DBCharge>, DatabaseError> {
|
||||
let user_subscription_id = user_subscription_id.0;
|
||||
let res = select_charges_with_predicate!(
|
||||
"WHERE subscription_id = $1 AND (status = 'open' OR status = 'cancelled' OR status = 'failed')",
|
||||
"WHERE subscription_id = $1 AND (status = 'open' OR status = 'expiring' OR status = 'cancelled' OR status = 'failed')",
|
||||
user_subscription_id
|
||||
)
|
||||
.fetch_optional(exec)
|
||||
@@ -240,6 +240,7 @@ impl DBCharge {
|
||||
charge_type = $1 AND
|
||||
(
|
||||
(status = 'cancelled' AND due < NOW()) OR
|
||||
(status = 'expiring' AND due < NOW()) OR
|
||||
(status = 'failed' AND last_attempt < NOW() - INTERVAL '2 days')
|
||||
)
|
||||
"#,
|
||||
|
||||
@@ -25,6 +25,7 @@ pub mod team_item;
|
||||
pub mod thread_item;
|
||||
pub mod user_item;
|
||||
pub mod user_subscription_item;
|
||||
pub mod users_redeemals;
|
||||
pub mod version_item;
|
||||
|
||||
pub use collection_item::DBCollection;
|
||||
|
||||
@@ -57,6 +57,26 @@ impl DBProduct {
|
||||
Ok(Self::get_many(&[id], exec).await?.into_iter().next())
|
||||
}
|
||||
|
||||
pub async fn get_by_type<'a, E>(
|
||||
exec: E,
|
||||
r#type: &str,
|
||||
) -> Result<Vec<Self>, DatabaseError>
|
||||
where
|
||||
E: sqlx::PgExecutor<'a>,
|
||||
{
|
||||
let maybe_row = select_products_with_predicate!(
|
||||
"WHERE metadata ->> 'type' = $1",
|
||||
r#type
|
||||
)
|
||||
.fetch_all(exec)
|
||||
.await?;
|
||||
|
||||
maybe_row
|
||||
.into_iter()
|
||||
.map(|r| r.try_into().map_err(Into::into))
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub async fn get_many(
|
||||
ids: &[DBProductId],
|
||||
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
|
||||
@@ -100,10 +120,11 @@ pub struct QueryProductWithPrices {
|
||||
}
|
||||
|
||||
impl QueryProductWithPrices {
|
||||
pub async fn list<'a, E>(
|
||||
/// Lists products with at least one public price.
|
||||
pub async fn list_purchaseable<'a, E>(
|
||||
exec: E,
|
||||
redis: &RedisPool,
|
||||
) -> Result<Vec<QueryProductWithPrices>, DatabaseError>
|
||||
) -> Result<Vec<Self>, DatabaseError>
|
||||
where
|
||||
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
|
||||
{
|
||||
@@ -118,7 +139,51 @@ impl QueryProductWithPrices {
|
||||
}
|
||||
|
||||
let all_products = product_item::DBProduct::get_all(exec).await?;
|
||||
let prices = product_item::DBProductPrice::get_all_products_prices(
|
||||
let prices =
|
||||
product_item::DBProductPrice::get_all_public_products_prices(
|
||||
&all_products.iter().map(|x| x.id).collect::<Vec<_>>(),
|
||||
exec,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let products = all_products
|
||||
.into_iter()
|
||||
.filter_map(|x| {
|
||||
Some(QueryProductWithPrices {
|
||||
id: x.id,
|
||||
metadata: x.metadata,
|
||||
prices: prices
|
||||
.remove(&x.id)
|
||||
.map(|x| x.1)?
|
||||
.into_iter()
|
||||
.map(|x| DBProductPrice {
|
||||
id: x.id,
|
||||
product_id: x.product_id,
|
||||
prices: x.prices,
|
||||
currency_code: x.currency_code,
|
||||
})
|
||||
.collect(),
|
||||
unitary: x.unitary,
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
redis
|
||||
.set_serialized_to_json(PRODUCTS_NAMESPACE, "all", &products, None)
|
||||
.await?;
|
||||
|
||||
Ok(products)
|
||||
}
|
||||
|
||||
pub async fn list_by_product_type<'a, E>(
|
||||
exec: E,
|
||||
r#type: &str,
|
||||
) -> Result<Vec<Self>, DatabaseError>
|
||||
where
|
||||
E: sqlx::PgExecutor<'a> + Copy,
|
||||
{
|
||||
let all_products = DBProduct::get_by_type(exec, r#type).await?;
|
||||
let prices = DBProductPrice::get_all_products_prices(
|
||||
&all_products.iter().map(|x| x.id).collect::<Vec<_>>(),
|
||||
exec,
|
||||
)
|
||||
@@ -126,29 +191,26 @@ impl QueryProductWithPrices {
|
||||
|
||||
let products = all_products
|
||||
.into_iter()
|
||||
.map(|x| QueryProductWithPrices {
|
||||
id: x.id,
|
||||
metadata: x.metadata,
|
||||
prices: prices
|
||||
.remove(&x.id)
|
||||
.map(|x| x.1)
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.map(|x| DBProductPrice {
|
||||
id: x.id,
|
||||
product_id: x.product_id,
|
||||
prices: x.prices,
|
||||
currency_code: x.currency_code,
|
||||
})
|
||||
.collect(),
|
||||
unitary: x.unitary,
|
||||
.filter_map(|x| {
|
||||
Some(QueryProductWithPrices {
|
||||
id: x.id,
|
||||
metadata: x.metadata,
|
||||
prices: prices
|
||||
.remove(&x.id)
|
||||
.map(|x| x.1)?
|
||||
.into_iter()
|
||||
.map(|x| DBProductPrice {
|
||||
id: x.id,
|
||||
product_id: x.product_id,
|
||||
prices: x.prices,
|
||||
currency_code: x.currency_code,
|
||||
})
|
||||
.collect(),
|
||||
unitary: x.unitary,
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
redis
|
||||
.set_serialized_to_json(PRODUCTS_NAMESPACE, "all", &products, None)
|
||||
.await?;
|
||||
|
||||
Ok(products)
|
||||
}
|
||||
}
|
||||
@@ -169,7 +231,11 @@ struct ProductPriceQueryResult {
|
||||
}
|
||||
|
||||
macro_rules! select_prices_with_predicate {
|
||||
($predicate:tt, $param:ident) => {
|
||||
($predicate:tt, $param1:ident) => {
|
||||
select_prices_with_predicate!($predicate, $param1, )
|
||||
};
|
||||
|
||||
($predicate:tt, $($param:ident,)+) => {
|
||||
sqlx::query_as!(
|
||||
ProductPriceQueryResult,
|
||||
r#"
|
||||
@@ -177,7 +243,7 @@ macro_rules! select_prices_with_predicate {
|
||||
FROM products_prices
|
||||
"#
|
||||
+ $predicate,
|
||||
$param
|
||||
$($param),+
|
||||
)
|
||||
};
|
||||
}
|
||||
@@ -231,33 +297,81 @@ impl DBProductPrice {
|
||||
Ok(res.remove(&product_id).map(|x| x.1).unwrap_or_default())
|
||||
}
|
||||
|
||||
pub async fn get_all_public_product_prices(
|
||||
product_id: DBProductId,
|
||||
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
|
||||
) -> Result<Vec<DBProductPrice>, DatabaseError> {
|
||||
let res =
|
||||
Self::get_all_public_products_prices(&[product_id], exec).await?;
|
||||
|
||||
Ok(res.remove(&product_id).map(|x| x.1).unwrap_or_default())
|
||||
}
|
||||
|
||||
/// Gets all public prices for the given products. If a product has no public price,
|
||||
/// it won't be included in the resulting map.
|
||||
pub async fn get_all_public_products_prices(
|
||||
product_ids: &[DBProductId],
|
||||
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
|
||||
) -> Result<DashMap<DBProductId, Vec<DBProductPrice>>, DatabaseError> {
|
||||
Self::get_all_products_prices_with_visibility(
|
||||
product_ids,
|
||||
Some(true),
|
||||
exec,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_all_products_prices(
|
||||
product_ids: &[DBProductId],
|
||||
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
|
||||
) -> Result<DashMap<DBProductId, Vec<DBProductPrice>>, DatabaseError> {
|
||||
Self::get_all_products_prices_with_visibility(product_ids, None, exec)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_all_products_prices_with_visibility(
|
||||
product_ids: &[DBProductId],
|
||||
public_filter: Option<bool>,
|
||||
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
|
||||
) -> Result<DashMap<DBProductId, Vec<DBProductPrice>>, DatabaseError> {
|
||||
let ids = product_ids.iter().map(|id| id.0).collect_vec();
|
||||
let ids_ref: &[i64] = &ids;
|
||||
|
||||
use futures_util::TryStreamExt;
|
||||
let prices = select_prices_with_predicate!(
|
||||
"WHERE product_id = ANY($1::bigint[])",
|
||||
ids_ref
|
||||
)
|
||||
.fetch(exec)
|
||||
.try_fold(
|
||||
DashMap::new(),
|
||||
|acc: DashMap<DBProductId, Vec<DBProductPrice>>, x| {
|
||||
if let Ok(item) = <ProductPriceQueryResult as TryInto<
|
||||
DBProductPrice,
|
||||
>>::try_into(x)
|
||||
{
|
||||
acc.entry(item.product_id).or_default().push(item);
|
||||
}
|
||||
|
||||
async move { Ok(acc) }
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
let predicate = |acc: DashMap<DBProductId, Vec<DBProductPrice>>, x| {
|
||||
if let Ok(item) = <ProductPriceQueryResult as TryInto<
|
||||
DBProductPrice,
|
||||
>>::try_into(x)
|
||||
{
|
||||
acc.entry(item.product_id).or_default().push(item);
|
||||
}
|
||||
|
||||
async move { Ok(acc) }
|
||||
};
|
||||
|
||||
let prices = match public_filter {
|
||||
None => {
|
||||
select_prices_with_predicate!(
|
||||
"WHERE product_id = ANY($1::bigint[])",
|
||||
ids_ref,
|
||||
)
|
||||
.fetch(exec)
|
||||
.try_fold(DashMap::new(), predicate)
|
||||
.await?
|
||||
}
|
||||
|
||||
Some(public) => {
|
||||
select_prices_with_predicate!(
|
||||
"WHERE product_id = ANY($1::bigint[]) AND public = $2",
|
||||
ids_ref,
|
||||
public,
|
||||
)
|
||||
.fetch(exec)
|
||||
.try_fold(DashMap::new(), predicate)
|
||||
.await?
|
||||
}
|
||||
};
|
||||
|
||||
Ok(prices)
|
||||
}
|
||||
|
||||
299
apps/labrinth/src/database/models/users_redeemals.rs
Normal file
299
apps/labrinth/src/database/models/users_redeemals.rs
Normal file
@@ -0,0 +1,299 @@
|
||||
use crate::database::models::DBUserId;
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::{query, query_scalar};
|
||||
use std::fmt;
|
||||
|
||||
#[derive(
|
||||
Debug, Default, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize,
|
||||
)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum Offer {
|
||||
#[default]
|
||||
Medal,
|
||||
}
|
||||
|
||||
impl Offer {
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
Offer::Medal => "medal",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_str_or_default(s: &str) -> Self {
|
||||
match s {
|
||||
"medal" => Offer::Medal,
|
||||
_ => Offer::Medal,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Offer {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.as_str())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Debug, Default, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize,
|
||||
)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum Status {
|
||||
#[default]
|
||||
Pending,
|
||||
Processing,
|
||||
Processed,
|
||||
}
|
||||
|
||||
impl Status {
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
Status::Pending => "pending",
|
||||
Status::Processing => "processing",
|
||||
Status::Processed => "processed",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_str_or_default(s: &str) -> Self {
|
||||
match s {
|
||||
"pending" => Status::Pending,
|
||||
"processing" => Status::Processing,
|
||||
"processed" => Status::Processed,
|
||||
_ => Status::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Status {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.as_str())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct UserRedeemal {
|
||||
pub id: i32,
|
||||
pub user_id: DBUserId,
|
||||
pub offer: Offer,
|
||||
pub redeemed: DateTime<Utc>,
|
||||
pub last_attempt: Option<DateTime<Utc>>,
|
||||
pub n_attempts: i32,
|
||||
pub status: Status,
|
||||
}
|
||||
|
||||
impl UserRedeemal {
|
||||
pub async fn get_pending<'a, E>(
|
||||
exec: E,
|
||||
limit: i64,
|
||||
) -> sqlx::Result<Vec<UserRedeemal>>
|
||||
where
|
||||
E: sqlx::PgExecutor<'a>,
|
||||
{
|
||||
let redeemals = query!(
|
||||
r#"SELECT * FROM users_redeemals WHERE status = $1 LIMIT $2"#,
|
||||
Status::Pending.as_str(),
|
||||
limit
|
||||
)
|
||||
.fetch_all(exec)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|row| UserRedeemal {
|
||||
id: row.id,
|
||||
user_id: DBUserId(row.user_id),
|
||||
offer: Offer::from_str_or_default(&row.offer),
|
||||
redeemed: row.redeemed,
|
||||
last_attempt: row.last_attempt,
|
||||
n_attempts: row.n_attempts,
|
||||
status: Status::from_str_or_default(&row.status),
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(redeemals)
|
||||
}
|
||||
|
||||
pub async fn update_stuck_5_minutes<'a, E>(exec: E) -> sqlx::Result<()>
|
||||
where
|
||||
E: sqlx::PgExecutor<'a>,
|
||||
{
|
||||
query!(
|
||||
r#"
|
||||
UPDATE users_redeemals
|
||||
SET status = $1
|
||||
WHERE
|
||||
status = $2
|
||||
AND NOW() - last_attempt > INTERVAL '5 minutes'
|
||||
"#,
|
||||
Status::Pending.as_str(),
|
||||
Status::Processing.as_str(),
|
||||
)
|
||||
.execute(exec)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn exists_by_user_and_offer<'a, E>(
|
||||
exec: E,
|
||||
user_id: DBUserId,
|
||||
offer: Offer,
|
||||
) -> sqlx::Result<bool>
|
||||
where
|
||||
E: sqlx::PgExecutor<'a>,
|
||||
{
|
||||
query_scalar!(
|
||||
r#"SELECT
|
||||
EXISTS (
|
||||
SELECT
|
||||
1
|
||||
FROM
|
||||
users_redeemals
|
||||
WHERE
|
||||
user_id = $1
|
||||
AND offer = $2
|
||||
) AS "exists!"
|
||||
"#,
|
||||
user_id.0,
|
||||
offer.as_str(),
|
||||
)
|
||||
.fetch_one(exec)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn insert<'a, E>(&mut self, exec: E) -> sqlx::Result<()>
|
||||
where
|
||||
E: sqlx::PgExecutor<'a>,
|
||||
{
|
||||
let query = query_scalar!(
|
||||
r#"INSERT INTO users_redeemals
|
||||
(user_id, offer, redeemed, status, last_attempt, n_attempts)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)
|
||||
RETURNING id
|
||||
"#,
|
||||
self.user_id.0,
|
||||
self.offer.as_str(),
|
||||
self.redeemed,
|
||||
self.status.as_str(),
|
||||
self.last_attempt,
|
||||
self.n_attempts,
|
||||
);
|
||||
|
||||
let id = query.fetch_one(exec).await?;
|
||||
|
||||
self.id = id;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Updates `status`, `last_attempt`, and `n_attempts` only if `status` is currently pending.
|
||||
/// Returns `true` if the status was updated, `false` otherwise.
|
||||
pub async fn update_status_if_pending<'a, E>(
|
||||
&self,
|
||||
exec: E,
|
||||
) -> sqlx::Result<bool>
|
||||
where
|
||||
E: sqlx::PgExecutor<'a>,
|
||||
{
|
||||
let query = query!(
|
||||
r#"UPDATE users_redeemals
|
||||
SET
|
||||
status = $3,
|
||||
last_attempt = $4,
|
||||
n_attempts = $5
|
||||
WHERE id = $1 AND status = $2
|
||||
"#,
|
||||
self.id,
|
||||
Status::Pending.as_str(),
|
||||
self.status.as_str(),
|
||||
self.last_attempt,
|
||||
self.n_attempts,
|
||||
);
|
||||
|
||||
let query_result = query.execute(exec).await?;
|
||||
|
||||
Ok(query_result.rows_affected() > 0)
|
||||
}
|
||||
|
||||
pub async fn update<'a, E>(&self, exec: E) -> sqlx::Result<()>
|
||||
where
|
||||
E: sqlx::PgExecutor<'a>,
|
||||
{
|
||||
let query = query!(
|
||||
r#"UPDATE users_redeemals
|
||||
SET
|
||||
offer = $2,
|
||||
status = $3,
|
||||
redeemed = $4,
|
||||
last_attempt = $5,
|
||||
n_attempts = $6
|
||||
WHERE id = $1
|
||||
"#,
|
||||
self.id,
|
||||
self.offer.as_str(),
|
||||
self.status.as_str(),
|
||||
self.redeemed,
|
||||
self.last_attempt,
|
||||
self.n_attempts,
|
||||
);
|
||||
|
||||
query.execute(exec).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RedeemalLookupFields {
|
||||
pub user_id: DBUserId,
|
||||
pub redeemal_status: Option<Status>,
|
||||
}
|
||||
|
||||
impl RedeemalLookupFields {
|
||||
/// Returns the redeemal status of a user for an offer, while looking up the user
|
||||
/// itself. **This expects a single redeemal per user/offer pair**.
|
||||
///
|
||||
/// If the returned value is `Ok(None)`, the user doesn't exist.
|
||||
///
|
||||
/// If the returned value is `Ok(Some(fields))`, but `redeemal_status` is `None`,
|
||||
/// the user exists and has not redeemed the offer.
|
||||
pub async fn redeemal_status_by_username_and_offer<'a, E>(
|
||||
exec: E,
|
||||
user_username: &str,
|
||||
offer: Offer,
|
||||
) -> sqlx::Result<Option<RedeemalLookupFields>>
|
||||
where
|
||||
E: sqlx::PgExecutor<'a>,
|
||||
{
|
||||
let maybe_row = query!(
|
||||
r#"
|
||||
SELECT
|
||||
users.id,
|
||||
users_redeemals.status AS "status: Option<String>"
|
||||
FROM
|
||||
users
|
||||
LEFT JOIN
|
||||
users_redeemals ON users_redeemals.user_id = users.id
|
||||
AND users_redeemals.offer = $2
|
||||
WHERE
|
||||
users.username = $1
|
||||
ORDER BY
|
||||
users_redeemals.redeemed DESC
|
||||
LIMIT 1
|
||||
"#,
|
||||
user_username,
|
||||
offer.as_str(),
|
||||
)
|
||||
.fetch_optional(exec)
|
||||
.await?;
|
||||
|
||||
// If no row was returned, the user doesn't exist.
|
||||
// If a row NULL status was returned, the user exists but has no redeemed the offer.
|
||||
|
||||
Ok(maybe_row.map(|row| RedeemalLookupFields {
|
||||
user_id: DBUserId(row.id),
|
||||
redeemal_status: row
|
||||
.status
|
||||
.as_deref()
|
||||
.map(Status::from_str_or_default),
|
||||
}))
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user