Payments/subscriptions support (#943)

* [wip] Payments/subscriptions support

* finish

* working payment flow

* finish subscriptions, lint, clippy, etc

* docker compose
This commit is contained in:
Geometrically
2024-08-14 17:14:52 -07:00
committed by GitHub
parent 60edbcd5f0
commit 1d0d8d7fbe
71 changed files with 4009 additions and 1101 deletions

View File

@@ -108,15 +108,13 @@ impl Category {
ORDER BY c.ordering, c.category
"
)
.fetch_many(exec)
.try_filter_map(|e| async {
Ok(e.right().map(|c| Category {
id: CategoryId(c.id),
category: c.category,
project_type: c.project_type,
icon: c.icon,
header: c.category_header
}))
.fetch(exec)
.map_ok(|c| Category {
id: CategoryId(c.id),
category: c.category,
project_type: c.project_type,
icon: c.icon,
header: c.category_header
})
.try_collect::<Vec<Category>>()
.await?;
@@ -166,13 +164,11 @@ impl LinkPlatform {
SELECT id, name, donation FROM link_platforms
"
)
.fetch_many(exec)
.try_filter_map(|e| async {
Ok(e.right().map(|c| LinkPlatform {
id: LinkPlatformId(c.id),
name: c.name,
donation: c.donation,
}))
.fetch(exec)
.map_ok(|c| LinkPlatform {
id: LinkPlatformId(c.id),
name: c.name,
donation: c.donation,
})
.try_collect::<Vec<LinkPlatform>>()
.await?;
@@ -222,8 +218,8 @@ impl ReportType {
SELECT name FROM report_types
"
)
.fetch_many(exec)
.try_filter_map(|e| async { Ok(e.right().map(|c| c.name)) })
.fetch(exec)
.map_ok(|c| c.name)
.try_collect::<Vec<String>>()
.await?;
@@ -272,8 +268,8 @@ impl ProjectType {
SELECT name FROM project_types
"
)
.fetch_many(exec)
.try_filter_map(|e| async { Ok(e.right().map(|c| c.name)) })
.fetch(exec)
.map_ok(|c| c.name)
.try_collect::<Vec<String>>()
.await?;

View File

@@ -232,6 +232,30 @@ generate_ids!(
PayoutId
);
generate_ids!(
pub generate_product_id,
ProductId,
8,
"SELECT EXISTS(SELECT 1 FROM products WHERE id=$1)",
ProductId
);
generate_ids!(
pub generate_product_price_id,
ProductPriceId,
8,
"SELECT EXISTS(SELECT 1 FROM products_prices WHERE id=$1)",
ProductPriceId
);
generate_ids!(
pub generate_user_subscription_id,
UserSubscriptionId,
8,
"SELECT EXISTS(SELECT 1 FROM users_subscriptions WHERE id=$1)",
UserSubscriptionId
);
#[derive(Copy, Clone, Debug, PartialEq, Eq, Type, Hash, Serialize, Deserialize)]
#[sqlx(transparent)]
pub struct UserId(pub i64);
@@ -351,6 +375,17 @@ pub struct OAuthAccessTokenId(pub i64);
#[sqlx(transparent)]
pub struct PayoutId(pub i64);
#[derive(Copy, Clone, Debug, Type, Serialize, Deserialize, Eq, PartialEq, Hash)]
#[sqlx(transparent)]
pub struct ProductId(pub i64);
#[derive(Copy, Clone, Debug, Type, Serialize, Deserialize, Eq, PartialEq, Hash)]
#[sqlx(transparent)]
pub struct ProductPriceId(pub i64);
#[derive(Copy, Clone, Debug, Type, Serialize, Deserialize, Eq, PartialEq, Hash)]
#[sqlx(transparent)]
pub struct UserSubscriptionId(pub i64);
use crate::models::ids;
impl From<ids::ProjectId> for ProjectId {
@@ -504,3 +539,35 @@ impl From<PayoutId> for ids::PayoutId {
ids::PayoutId(id.0 as u64)
}
}
impl From<ids::ProductId> for ProductId {
fn from(id: ids::ProductId) -> Self {
ProductId(id.0 as i64)
}
}
impl From<ProductId> for ids::ProductId {
fn from(id: ProductId) -> Self {
ids::ProductId(id.0 as u64)
}
}
impl From<ids::ProductPriceId> for ProductPriceId {
fn from(id: ids::ProductPriceId) -> Self {
ProductPriceId(id.0 as i64)
}
}
impl From<ProductPriceId> for ids::ProductPriceId {
fn from(id: ProductPriceId) -> Self {
ids::ProductPriceId(id.0 as u64)
}
}
impl From<ids::UserSubscriptionId> for UserSubscriptionId {
fn from(id: ids::UserSubscriptionId) -> Self {
UserSubscriptionId(id.0 as i64)
}
}
impl From<UserSubscriptionId> for ids::UserSubscriptionId {
fn from(id: UserSubscriptionId) -> Self {
ids::UserSubscriptionId(id.0 as u64)
}
}

View File

@@ -135,24 +135,22 @@ impl Image {
report_id.map(|x| x.0),
)
.fetch_many(&mut **transaction)
.try_filter_map(|e| async {
Ok(e.right().map(|row| {
let id = ImageId(row.id);
.fetch(&mut **transaction)
.map_ok(|row| {
let id = ImageId(row.id);
Image {
id,
url: row.url,
size: row.size as u64,
created: row.created,
owner_id: UserId(row.owner_id),
context: row.context,
project_id: row.mod_id.map(ProjectId),
version_id: row.version_id.map(VersionId),
thread_message_id: row.thread_message_id.map(ThreadMessageId),
report_id: row.report_id.map(ReportId),
}
}))
Image {
id,
url: row.url,
size: row.size as u64,
created: row.created,
owner_id: UserId(row.owner_id),
context: row.context,
project_id: row.mod_id.map(ProjectId),
version_id: row.version_id.map(VersionId),
thread_message_id: row.thread_message_id.map(ThreadMessageId),
report_id: row.report_id.map(ReportId),
}
})
.try_collect::<Vec<Image>>()
.await

View File

@@ -60,15 +60,13 @@ impl Game {
SELECT id, slug, name, icon_url, banner_url FROM games
",
)
.fetch_many(exec)
.try_filter_map(|e| async {
Ok(e.right().map(|x| Game {
id: GameId(x.id),
slug: x.slug,
name: x.name,
icon_url: x.icon_url,
banner_url: x.banner_url,
}))
.fetch(exec)
.map_ok(|x| Game {
id: GameId(x.id),
slug: x.slug,
name: x.name,
icon_url: x.icon_url,
banner_url: x.banner_url,
})
.try_collect::<Vec<Game>>()
.await?;
@@ -151,24 +149,21 @@ impl Loader {
GROUP BY l.id;
",
)
.fetch_many(exec)
.try_filter_map(|e| async {
Ok(e.right().map(|x| Loader {
id: LoaderId(x.id),
loader: x.loader,
icon: x.icon,
supported_project_types: x
.project_types
.unwrap_or_default()
.iter()
.map(|x| x.to_string())
.collect(),
supported_games: x
.games
.unwrap_or_default(),
metadata: x.metadata
}))
.fetch(exec)
.map_ok(|x| Loader {
id: LoaderId(x.id),
loader: x.loader,
icon: x.icon,
supported_project_types: x
.project_types
.unwrap_or_default()
.iter()
.map(|x| x.to_string())
.collect(),
supported_games: x
.games
.unwrap_or_default(),
metadata: x.metadata
})
.try_collect::<Vec<_>>()
.await?;
@@ -451,21 +446,22 @@ impl LoaderField {
FROM loader_fields lf
",
)
.fetch_many(exec)
.try_filter_map(|e| async {
Ok(e.right().and_then(|r| {
Some(LoaderField {
id: LoaderFieldId(r.id),
field_type: LoaderFieldType::build(&r.field_type, r.enum_type)?,
field: r.field,
optional: r.optional,
min_val: r.min_val,
max_val: r.max_val,
})
}))
.fetch(exec)
.map_ok(|r| {
Some(LoaderField {
id: LoaderFieldId(r.id),
field_type: LoaderFieldType::build(&r.field_type, r.enum_type)?,
field: r.field,
optional: r.optional,
min_val: r.min_val,
max_val: r.max_val,
})
})
.try_collect::<Vec<LoaderField>>()
.await?;
.try_collect::<Vec<Option<LoaderField>>>()
.await?
.into_iter()
.flatten()
.collect();
redis
.set_serialized_to_json(LOADER_FIELDS_NAMESPACE_ALL, "", &result, None)

View File

@@ -14,12 +14,14 @@ pub mod oauth_token_item;
pub mod organization_item;
pub mod pat_item;
pub mod payout_item;
pub mod product_item;
pub mod project_item;
pub mod report_item;
pub mod session_item;
pub mod team_item;
pub mod thread_item;
pub mod user_item;
pub mod user_subscription_item;
pub mod version_item;
pub use collection_item::Collection;

View File

@@ -110,35 +110,33 @@ impl Notification {
",
&notification_ids_parsed
)
.fetch_many(exec)
.try_filter_map(|e| async {
Ok(e.right().map(|row| {
let id = NotificationId(row.id);
.fetch(exec)
.map_ok(|row| {
let id = NotificationId(row.id);
Notification {
id,
user_id: UserId(row.user_id),
read: row.read,
created: row.created,
body: row.body.clone().and_then(|x| serde_json::from_value(x).ok()).unwrap_or_else(|| {
if let Some(name) = row.name {
NotificationBody::LegacyMarkdown {
notification_type: row.notification_type,
name,
text: row.text.unwrap_or_default(),
link: row.link.unwrap_or_default(),
actions: serde_json::from_value(
row.actions.unwrap_or_default(),
)
.ok()
.unwrap_or_default(),
}
} else {
NotificationBody::Unknown
Notification {
id,
user_id: UserId(row.user_id),
read: row.read,
created: row.created,
body: row.body.clone().and_then(|x| serde_json::from_value(x).ok()).unwrap_or_else(|| {
if let Some(name) = row.name {
NotificationBody::LegacyMarkdown {
notification_type: row.notification_type,
name,
text: row.text.unwrap_or_default(),
link: row.link.unwrap_or_default(),
actions: serde_json::from_value(
row.actions.unwrap_or_default(),
)
.ok()
.unwrap_or_default(),
}
}),
}
}))
} else {
NotificationBody::Unknown
}
}),
}
})
.try_collect::<Vec<Notification>>()
.await
@@ -173,35 +171,33 @@ impl Notification {
",
user_id as UserId
)
.fetch_many(exec)
.try_filter_map(|e| async {
Ok(e.right().map(|row| {
let id = NotificationId(row.id);
.fetch(exec)
.map_ok(|row| {
let id = NotificationId(row.id);
Notification {
id,
user_id: UserId(row.user_id),
read: row.read,
created: row.created,
body: row.body.clone().and_then(|x| serde_json::from_value(x).ok()).unwrap_or_else(|| {
if let Some(name) = row.name {
NotificationBody::LegacyMarkdown {
notification_type: row.notification_type,
name,
text: row.text.unwrap_or_default(),
link: row.link.unwrap_or_default(),
actions: serde_json::from_value(
row.actions.unwrap_or_default(),
)
.ok()
.unwrap_or_default(),
}
} else {
NotificationBody::Unknown
Notification {
id,
user_id: UserId(row.user_id),
read: row.read,
created: row.created,
body: row.body.clone().and_then(|x| serde_json::from_value(x).ok()).unwrap_or_else(|| {
if let Some(name) = row.name {
NotificationBody::LegacyMarkdown {
notification_type: row.notification_type,
name,
text: row.text.unwrap_or_default(),
link: row.link.unwrap_or_default(),
actions: serde_json::from_value(
row.actions.unwrap_or_default(),
)
.ok()
.unwrap_or_default(),
}
}),
}
}))
} else {
NotificationBody::Unknown
}
}),
}
})
.try_collect::<Vec<Notification>>()
.await?;
@@ -242,8 +238,8 @@ impl Notification {
",
&notification_ids_parsed
)
.fetch_many(&mut **transaction)
.try_filter_map(|e| async { Ok(e.right().map(|x| UserId(x.user_id))) })
.fetch(&mut **transaction)
.map_ok(|x| UserId(x.user_id))
.try_collect::<Vec<_>>()
.await?;
@@ -285,8 +281,8 @@ impl Notification {
",
&notification_ids_parsed
)
.fetch_many(&mut **transaction)
.try_filter_map(|e| async { Ok(e.right().map(|x| UserId(x.user_id))) })
.fetch(&mut **transaction)
.map_ok(|x| UserId(x.user_id))
.try_collect::<Vec<_>>()
.await?;

View File

@@ -167,8 +167,8 @@ impl PersonalAccessToken {
",
user_id.0,
)
.fetch_many(exec)
.try_filter_map(|e| async { Ok(e.right().map(|x| PatId(x.id))) })
.fetch(exec)
.map_ok(|x| PatId(x.id))
.try_collect::<Vec<PatId>>()
.await?;

View File

@@ -74,19 +74,17 @@ impl Payout {
",
&payout_ids.into_iter().map(|x| x.0).collect::<Vec<_>>()
)
.fetch_many(exec)
.try_filter_map(|e| async {
Ok(e.right().map(|r| Payout {
id: PayoutId(r.id),
user_id: UserId(r.user_id),
created: r.created,
status: PayoutStatus::from_string(&r.status),
amount: r.amount,
method: r.method.map(|x| PayoutMethodType::from_string(&x)),
method_address: r.method_address,
platform_id: r.platform_id,
fee: r.fee,
}))
.fetch(exec)
.map_ok(|r| Payout {
id: PayoutId(r.id),
user_id: UserId(r.user_id),
created: r.created,
status: PayoutStatus::from_string(&r.status),
amount: r.amount,
method: r.method.map(|x| PayoutMethodType::from_string(&x)),
method_address: r.method_address,
platform_id: r.platform_id,
fee: r.fee,
})
.try_collect::<Vec<Payout>>()
.await?;

View File

@@ -0,0 +1,248 @@
use crate::database::models::{product_item, DatabaseError, ProductId, ProductPriceId};
use crate::database::redis::RedisPool;
use crate::models::billing::{Price, ProductMetadata};
use dashmap::DashMap;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
use std::convert::TryInto;
const PRODUCTS_NAMESPACE: &str = "products";
pub struct ProductItem {
pub id: ProductId,
pub metadata: ProductMetadata,
pub unitary: bool,
}
struct ProductResult {
id: i64,
metadata: serde_json::Value,
unitary: bool,
}
macro_rules! select_products_with_predicate {
($predicate:tt, $param:ident) => {
sqlx::query_as!(
ProductResult,
r#"
SELECT id, metadata, unitary
FROM products
"#
+ $predicate,
$param
)
};
}
impl TryFrom<ProductResult> for ProductItem {
type Error = serde_json::Error;
fn try_from(r: ProductResult) -> Result<Self, Self::Error> {
Ok(ProductItem {
id: ProductId(r.id),
metadata: serde_json::from_value(r.metadata)?,
unitary: r.unitary,
})
}
}
impl ProductItem {
pub async fn get(
id: ProductId,
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Option<ProductItem>, DatabaseError> {
Ok(Self::get_many(&[id], exec).await?.into_iter().next())
}
pub async fn get_many(
ids: &[ProductId],
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<ProductItem>, DatabaseError> {
let ids = ids.iter().map(|id| id.0).collect_vec();
let ids_ref: &[i64] = &ids;
let results = select_products_with_predicate!("WHERE id = ANY($1::bigint[])", ids_ref)
.fetch_all(exec)
.await?;
Ok(results
.into_iter()
.map(|r| r.try_into())
.collect::<Result<Vec<_>, serde_json::Error>>()?)
}
pub async fn get_all(
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<ProductItem>, DatabaseError> {
let one = 1;
let results = select_products_with_predicate!("WHERE 1 = $1", one)
.fetch_all(exec)
.await?;
Ok(results
.into_iter()
.map(|r| r.try_into())
.collect::<Result<Vec<_>, serde_json::Error>>()?)
}
}
#[derive(Deserialize, Serialize)]
pub struct QueryProduct {
pub id: ProductId,
pub metadata: ProductMetadata,
pub unitary: bool,
pub prices: Vec<ProductPriceItem>,
}
impl QueryProduct {
pub async fn list<'a, E>(exec: E, redis: &RedisPool) -> Result<Vec<QueryProduct>, DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
{
let mut redis = redis.connect().await?;
let res: Option<Vec<QueryProduct>> = redis
.get_deserialized_from_json(PRODUCTS_NAMESPACE, "all")
.await?;
if let Some(res) = res {
return Ok(res);
}
let all_products = product_item::ProductItem::get_all(exec).await?;
let prices = product_item::ProductPriceItem::get_all_products_prices(
&all_products.iter().map(|x| x.id).collect::<Vec<_>>(),
exec,
)
.await?;
let products = all_products
.into_iter()
.map(|x| QueryProduct {
id: x.id,
metadata: x.metadata,
prices: prices
.remove(&x.id)
.map(|x| x.1)
.unwrap_or_default()
.into_iter()
.map(|x| ProductPriceItem {
id: x.id,
product_id: x.product_id,
prices: x.prices,
currency_code: x.currency_code,
})
.collect(),
unitary: x.unitary,
})
.collect::<Vec<_>>();
redis
.set_serialized_to_json(PRODUCTS_NAMESPACE, "all", &products, None)
.await?;
Ok(products)
}
}
#[derive(Deserialize, Serialize)]
pub struct ProductPriceItem {
pub id: ProductPriceId,
pub product_id: ProductId,
pub prices: Price,
pub currency_code: String,
}
struct ProductPriceResult {
id: i64,
product_id: i64,
prices: serde_json::Value,
currency_code: String,
}
macro_rules! select_prices_with_predicate {
($predicate:tt, $param:ident) => {
sqlx::query_as!(
ProductPriceResult,
r#"
SELECT id, product_id, prices, currency_code
FROM products_prices
"#
+ $predicate,
$param
)
};
}
impl TryFrom<ProductPriceResult> for ProductPriceItem {
type Error = serde_json::Error;
fn try_from(r: ProductPriceResult) -> Result<Self, Self::Error> {
Ok(ProductPriceItem {
id: ProductPriceId(r.id),
product_id: ProductId(r.product_id),
prices: serde_json::from_value(r.prices)?,
currency_code: r.currency_code,
})
}
}
impl ProductPriceItem {
pub async fn get(
id: ProductPriceId,
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Option<ProductPriceItem>, DatabaseError> {
Ok(Self::get_many(&[id], exec).await?.into_iter().next())
}
pub async fn get_many(
ids: &[ProductPriceId],
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<ProductPriceItem>, DatabaseError> {
let ids = ids.iter().map(|id| id.0).collect_vec();
let ids_ref: &[i64] = &ids;
let results = select_prices_with_predicate!("WHERE id = ANY($1::bigint[])", ids_ref)
.fetch_all(exec)
.await?;
Ok(results
.into_iter()
.map(|r| r.try_into())
.collect::<Result<Vec<_>, serde_json::Error>>()?)
}
pub async fn get_all_product_prices(
product_id: ProductId,
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<ProductPriceItem>, DatabaseError> {
let res = Self::get_all_products_prices(&[product_id], exec).await?;
Ok(res.remove(&product_id).map(|x| x.1).unwrap_or_default())
}
pub async fn get_all_products_prices(
product_ids: &[ProductId],
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<DashMap<ProductId, Vec<ProductPriceItem>>, DatabaseError> {
let ids = product_ids.iter().map(|id| id.0).collect_vec();
let ids_ref: &[i64] = &ids;
use futures_util::TryStreamExt;
let prices = select_prices_with_predicate!("WHERE product_id = ANY($1::bigint[])", ids_ref)
.fetch(exec)
.try_fold(
DashMap::new(),
|acc: DashMap<ProductId, Vec<ProductPriceItem>>, x| {
if let Ok(item) = <ProductPriceResult as TryInto<ProductPriceItem>>::try_into(x)
{
acc.entry(item.product_id).or_default().push(item);
}
async move { Ok(acc) }
},
)
.await?;
Ok(prices)
}
}

View File

@@ -358,8 +358,8 @@ impl Project {
",
id as ProjectId,
)
.fetch_many(&mut **transaction)
.try_filter_map(|e| async { Ok(e.right().map(|x| ThreadId(x.id))) })
.fetch(&mut **transaction)
.map_ok(|x| ThreadId(x.id))
.try_collect::<Vec<_>>()
.await?;
@@ -443,8 +443,8 @@ impl Project {
",
project.inner.team_id as TeamId,
)
.fetch_many(&mut **transaction)
.try_filter_map(|e| async { Ok(e.right().map(|x| UserId(x.user_id))) })
.fetch(&mut **transaction)
.map_ok(|x| UserId(x.user_id))
.try_collect::<Vec<_>>()
.await?;
@@ -874,19 +874,17 @@ impl Project {
",
id as ProjectId
)
.fetch_many(exec)
.try_filter_map(|e| async {
Ok(e.right().map(|x| {
(
x.dependency_id.map(VersionId),
if x.mod_id == Some(0) {
None
} else {
x.mod_id.map(ProjectId)
},
x.mod_dependency_id.map(ProjectId),
)
}))
.fetch(exec)
.map_ok(|x| {
(
x.dependency_id.map(VersionId),
if x.mod_id == Some(0) {
None
} else {
x.mod_id.map(ProjectId)
},
x.mod_dependency_id.map(ProjectId),
)
})
.try_collect::<Dependencies>()
.await?;

View File

@@ -86,20 +86,18 @@ impl Report {
",
&report_ids_parsed
)
.fetch_many(exec)
.try_filter_map(|e| async {
Ok(e.right().map(|x| QueryReport {
id: ReportId(x.id),
report_type: x.name,
project_id: x.mod_id.map(ProjectId),
version_id: x.version_id.map(VersionId),
user_id: x.user_id.map(UserId),
body: x.body,
reporter: UserId(x.reporter),
created: x.created,
closed: x.closed,
thread_id: ThreadId(x.thread_id)
}))
.fetch(exec)
.map_ok(|x| QueryReport {
id: ReportId(x.id),
report_type: x.name,
project_id: x.mod_id.map(ProjectId),
version_id: x.version_id.map(VersionId),
user_id: x.user_id.map(UserId),
body: x.body,
reporter: UserId(x.reporter),
created: x.created,
closed: x.closed,
thread_id: ThreadId(x.thread_id)
})
.try_collect::<Vec<QueryReport>>()
.await?;

View File

@@ -220,8 +220,8 @@ impl Session {
",
user_id.0,
)
.fetch_many(exec)
.try_filter_map(|e| async { Ok(e.right().map(|x| SessionId(x.id))) })
.fetch(exec)
.map_ok(|x| SessionId(x.id))
.try_collect::<Vec<SessionId>>()
.await?;

View File

@@ -300,35 +300,25 @@ impl TeamMember {
&team_ids_parsed,
user_id as UserId
)
.fetch_many(executor)
.try_filter_map(|e| async {
if let Some(m) = e.right() {
Ok(Some(Ok(TeamMember {
id: TeamMemberId(m.id),
team_id: TeamId(m.team_id),
user_id,
role: m.role,
is_owner: m.is_owner,
permissions: ProjectPermissions::from_bits(m.permissions as u64)
.unwrap_or_default(),
organization_permissions: m
.organization_permissions
.map(|p| OrganizationPermissions::from_bits(p as u64).unwrap_or_default()),
accepted: m.accepted,
payouts_split: m.payouts_split,
ordering: m.ordering,
})))
} else {
Ok(None)
}
.fetch(executor)
.map_ok(|m| TeamMember {
id: TeamMemberId(m.id),
team_id: TeamId(m.team_id),
user_id,
role: m.role,
is_owner: m.is_owner,
permissions: ProjectPermissions::from_bits(m.permissions as u64)
.unwrap_or_default(),
organization_permissions: m
.organization_permissions
.map(|p| OrganizationPermissions::from_bits(p as u64).unwrap_or_default()),
accepted: m.accepted,
payouts_split: m.payouts_split,
ordering: m.ordering,
})
.try_collect::<Vec<Result<TeamMember, super::DatabaseError>>>()
.try_collect::<Vec<TeamMember>>()
.await?;
let team_members = team_members
.into_iter()
.collect::<Result<Vec<TeamMember>, super::DatabaseError>>()?;
Ok(team_members)
}

View File

@@ -144,9 +144,8 @@ impl Thread {
",
&thread_ids_parsed
)
.fetch_many(exec)
.try_filter_map(|e| async {
Ok(e.right().map(|x| Thread {
.fetch(exec)
.map_ok(|x| Thread {
id: ThreadId(x.id),
project_id: x.mod_id.map(ProjectId),
report_id: x.report_id.map(ReportId),
@@ -161,8 +160,7 @@ impl Thread {
messages
},
members: x.members.unwrap_or_default().into_iter().map(UserId).collect(),
}))
})
})
.try_collect::<Vec<Thread>>()
.await?;
@@ -236,17 +234,14 @@ impl ThreadMessage {
",
&message_ids_parsed
)
.fetch_many(exec)
.try_filter_map(|e| async {
Ok(e.right().map(|x| ThreadMessage {
id: ThreadMessageId(x.id),
thread_id: ThreadId(x.thread_id),
author_id: x.author_id.map(UserId),
body: serde_json::from_value(x.body)
.unwrap_or(MessageBody::Deleted { private: false }),
created: x.created,
hide_identity: x.hide_identity,
}))
.fetch(exec)
.map_ok(|x| ThreadMessage {
id: ThreadMessageId(x.id),
thread_id: ThreadId(x.thread_id),
author_id: x.author_id.map(UserId),
body: serde_json::from_value(x.body).unwrap_or(MessageBody::Deleted { private: false }),
created: x.created,
hide_identity: x.hide_identity,
})
.try_collect::<Vec<ThreadMessage>>()
.await?;

View File

@@ -32,6 +32,7 @@ pub struct User {
pub paypal_country: Option<String>,
pub paypal_email: Option<String>,
pub venmo_handle: Option<String>,
pub stripe_customer_id: Option<String>,
pub totp_secret: Option<String>,
@@ -60,13 +61,13 @@ impl User {
avatar_url, bio, created,
github_id, discord_id, gitlab_id, google_id, steam_id, microsoft_id,
email_verified, password, paypal_id, paypal_country, paypal_email,
venmo_handle
venmo_handle, stripe_customer_id
)
VALUES (
$1, $2, $3, $4, $5,
$6, $7,
$8, $9, $10, $11, $12, $13,
$14, $15, $16, $17, $18, $19
$14, $15, $16, $17, $18, $19, $20
)
",
self.id as UserId,
@@ -87,7 +88,8 @@ impl User {
self.paypal_id,
self.paypal_country,
self.paypal_email,
self.venmo_handle
self.venmo_handle,
self.stripe_customer_id
)
.execute(&mut **transaction)
.await?;
@@ -170,7 +172,7 @@ impl User {
balance,
github_id, discord_id, gitlab_id, google_id, steam_id, microsoft_id,
email_verified, password, totp_secret, paypal_id, paypal_country, paypal_email,
venmo_handle
venmo_handle, stripe_customer_id
FROM users
WHERE id = ANY($1) OR LOWER(username) = ANY($2)
",
@@ -202,6 +204,7 @@ impl User {
paypal_country: u.paypal_country,
paypal_email: u.paypal_email,
venmo_handle: u.venmo_handle,
stripe_customer_id: u.stripe_customer_id,
totp_secret: u.totp_secret,
};
@@ -264,8 +267,8 @@ impl User {
",
user_id as UserId,
)
.fetch_many(exec)
.try_filter_map(|e| async { Ok(e.right().map(|m| ProjectId(m.id))) })
.fetch(exec)
.map_ok(|m| ProjectId(m.id))
.try_collect::<Vec<ProjectId>>()
.await?;
@@ -293,8 +296,8 @@ impl User {
",
user_id as UserId,
)
.fetch_many(exec)
.try_filter_map(|e| async { Ok(e.right().map(|m| OrganizationId(m.id))) })
.fetch(exec)
.map_ok(|m| OrganizationId(m.id))
.try_collect::<Vec<OrganizationId>>()
.await?;
@@ -317,8 +320,8 @@ impl User {
",
user_id as UserId,
)
.fetch_many(exec)
.try_filter_map(|e| async { Ok(e.right().map(|m| CollectionId(m.id))) })
.fetch(exec)
.map_ok(|m| CollectionId(m.id))
.try_collect::<Vec<CollectionId>>()
.await?;
@@ -341,8 +344,8 @@ impl User {
",
user_id as UserId,
)
.fetch_many(exec)
.try_filter_map(|e| async { Ok(e.right().map(|m| to_base62(m.code as u64))) })
.fetch(exec)
.map_ok(|m| to_base62(m.code as u64))
.try_collect::<Vec<String>>()
.await?;
@@ -430,8 +433,8 @@ impl User {
",
id as UserId,
)
.fetch_many(&mut **transaction)
.try_filter_map(|e| async { Ok(e.right().map(|m| m.id)) })
.fetch(&mut **transaction)
.map_ok(|m| m.id)
.try_collect::<Vec<i64>>()
.await?;
@@ -463,8 +466,8 @@ impl User {
",
id as UserId,
)
.fetch_many(&mut **transaction)
.try_filter_map(|e| async { Ok(e.right().map(|x| CollectionId(x.id))) })
.fetch(&mut **transaction)
.map_ok(|x| CollectionId(x.id))
.try_collect::<Vec<_>>()
.await?;
@@ -481,8 +484,8 @@ impl User {
",
id as UserId,
)
.fetch_many(&mut **transaction)
.try_filter_map(|e| async { Ok(e.right().map(|x| ThreadId(x.id))) })
.fetch(&mut **transaction)
.map_ok(|x| ThreadId(x.id))
.try_collect::<Vec<_>>()
.await?;

View File

@@ -0,0 +1,153 @@
use crate::database::models::{DatabaseError, ProductPriceId, UserId, UserSubscriptionId};
use crate::models::billing::{PriceDuration, SubscriptionStatus};
use chrono::{DateTime, Utc};
use itertools::Itertools;
pub struct UserSubscriptionItem {
pub id: UserSubscriptionId,
pub user_id: UserId,
pub price_id: ProductPriceId,
pub interval: PriceDuration,
pub created: DateTime<Utc>,
pub expires: DateTime<Utc>,
pub last_charge: Option<DateTime<Utc>>,
pub status: SubscriptionStatus,
}
struct UserSubscriptionResult {
id: i64,
user_id: i64,
price_id: i64,
interval: String,
pub created: DateTime<Utc>,
pub expires: DateTime<Utc>,
pub last_charge: Option<DateTime<Utc>>,
pub status: String,
}
macro_rules! select_user_subscriptions_with_predicate {
($predicate:tt, $param:ident) => {
sqlx::query_as!(
UserSubscriptionResult,
r#"
SELECT
id, user_id, price_id, interval, created, expires, last_charge, status
FROM users_subscriptions
"#
+ $predicate,
$param
)
};
}
impl From<UserSubscriptionResult> for UserSubscriptionItem {
fn from(r: UserSubscriptionResult) -> Self {
UserSubscriptionItem {
id: UserSubscriptionId(r.id),
user_id: UserId(r.user_id),
price_id: ProductPriceId(r.price_id),
interval: PriceDuration::from_string(&r.interval),
created: r.created,
expires: r.expires,
last_charge: r.last_charge,
status: SubscriptionStatus::from_string(&r.status),
}
}
}
impl UserSubscriptionItem {
pub async fn get(
id: UserSubscriptionId,
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Option<UserSubscriptionItem>, DatabaseError> {
Ok(Self::get_many(&[id], exec).await?.into_iter().next())
}
pub async fn get_many(
ids: &[UserSubscriptionId],
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<UserSubscriptionItem>, DatabaseError> {
let ids = ids.iter().map(|id| id.0).collect_vec();
let ids_ref: &[i64] = &ids;
let results =
select_user_subscriptions_with_predicate!("WHERE id = ANY($1::bigint[])", ids_ref)
.fetch_all(exec)
.await?;
Ok(results.into_iter().map(|r| r.into()).collect())
}
pub async fn get_all_user(
user_id: UserId,
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<UserSubscriptionItem>, DatabaseError> {
let user_id = user_id.0;
let results = select_user_subscriptions_with_predicate!("WHERE user_id = $1", user_id)
.fetch_all(exec)
.await?;
Ok(results.into_iter().map(|r| r.into()).collect())
}
pub async fn get_all_expired(
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<UserSubscriptionItem>, DatabaseError> {
let now = Utc::now();
let results = select_user_subscriptions_with_predicate!("WHERE expires < $1", now)
.fetch_all(exec)
.await?;
Ok(results.into_iter().map(|r| r.into()).collect())
}
pub async fn upsert(
&self,
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<(), DatabaseError> {
sqlx::query!(
"
INSERT INTO users_subscriptions (
id, user_id, price_id, interval, created, expires, last_charge, status
)
VALUES (
$1, $2, $3, $4, $5, $6, $7, $8
)
ON CONFLICT (id)
DO UPDATE
SET interval = EXCLUDED.interval,
expires = EXCLUDED.expires,
last_charge = EXCLUDED.last_charge,
status = EXCLUDED.status
",
self.id.0,
self.user_id.0,
self.price_id.0,
self.interval.as_str(),
self.created,
self.expires,
self.last_charge,
self.status.as_str(),
)
.execute(&mut **transaction)
.await?;
Ok(())
}
pub async fn remove(
id: UserSubscriptionId,
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<(), DatabaseError> {
sqlx::query!(
"
DELETE FROM users_subscriptions
WHERE id = $1
",
id.0 as i64
)
.execute(&mut **transaction)
.await?;
Ok(())
}
}