Initial Auth Impl + More Caching (#647)

* Port redis to staging

* redis cache on staging

* add back legacy auth callback

* Begin work on new auth flows

* Finish all auth flows

* Finish base session authentication

* run prep + fix clippy

* make compilation work
This commit is contained in:
Geometrically
2023-07-07 12:20:16 -07:00
committed by GitHub
parent b0057b130e
commit 239214ef92
53 changed files with 6250 additions and 6359 deletions

View File

@@ -2,7 +2,7 @@ use super::DatabaseError;
use crate::models::ids::base62_impl::to_base62;
use crate::models::ids::random_base62_rng;
use censor::Censor;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use sqlx::sqlx_macros::Type;
const ID_RETRY_COUNT: usize = 20;
@@ -129,35 +129,43 @@ generate_ids!(
ThreadMessageId
);
#[derive(Copy, Clone, Debug, PartialEq, Eq, Type, Deserialize)]
generate_ids!(
pub generate_session_id,
SessionId,
8,
"SELECT EXISTS(SELECT 1 FROM sessions WHERE id=$1)",
SessionId
);
#[derive(Copy, Clone, Debug, PartialEq, Eq, Type, Serialize, Deserialize)]
#[sqlx(transparent)]
pub struct UserId(pub i64);
#[derive(Copy, Clone, Debug, Type, Eq, PartialEq)]
#[derive(Copy, Clone, Debug, Type, Eq, PartialEq, Serialize, Deserialize)]
#[sqlx(transparent)]
pub struct TeamId(pub i64);
#[derive(Copy, Clone, Debug, Type)]
#[derive(Copy, Clone, Debug, Type, Serialize, Deserialize)]
#[sqlx(transparent)]
pub struct TeamMemberId(pub i64);
#[derive(Copy, Clone, Debug, Type, PartialEq, Eq, Deserialize, Hash)]
#[derive(Copy, Clone, Debug, Type, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[sqlx(transparent)]
pub struct ProjectId(pub i64);
#[derive(Copy, Clone, Debug, Type)]
#[derive(Copy, Clone, Debug, Type, Serialize, Deserialize)]
#[sqlx(transparent)]
pub struct ProjectTypeId(pub i32);
#[derive(Copy, Clone, Debug, Type)]
#[sqlx(transparent)]
pub struct StatusId(pub i32);
#[derive(Copy, Clone, Debug, Type)]
#[derive(Copy, Clone, Debug, Type, Serialize, Deserialize)]
#[sqlx(transparent)]
pub struct SideTypeId(pub i32);
#[derive(Copy, Clone, Debug, Type, Deserialize)]
#[derive(Copy, Clone, Debug, Type, Serialize, Deserialize)]
#[sqlx(transparent)]
pub struct DonationPlatformId(pub i32);
#[derive(Copy, Clone, Debug, Type, PartialEq, Eq, Hash, Deserialize)]
#[derive(Copy, Clone, Debug, Type, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[sqlx(transparent)]
pub struct VersionId(pub i64);
#[derive(Copy, Clone, Debug, Type, Deserialize)]
@@ -177,7 +185,7 @@ pub struct ReportId(pub i64);
#[sqlx(transparent)]
pub struct ReportTypeId(pub i32);
#[derive(Copy, Clone, Debug, Type, Hash, Eq, PartialEq, Deserialize)]
#[derive(Copy, Clone, Debug, Type, Hash, Eq, PartialEq, Deserialize, Serialize)]
#[sqlx(transparent)]
pub struct FileId(pub i64);
@@ -196,13 +204,17 @@ pub struct NotificationId(pub i64);
#[sqlx(transparent)]
pub struct NotificationActionId(pub i32);
#[derive(Copy, Clone, Debug, Type, Deserialize, Eq, PartialEq)]
#[derive(Copy, Clone, Debug, Type, Serialize, Deserialize, Eq, PartialEq)]
#[sqlx(transparent)]
pub struct ThreadId(pub i64);
#[derive(Copy, Clone, Debug, Type, Deserialize)]
#[sqlx(transparent)]
pub struct ThreadMessageId(pub i64);
#[derive(Copy, Clone, Debug, Type, Serialize, Deserialize)]
#[sqlx(transparent)]
pub struct SessionId(pub i64);
use crate::models::ids;
impl From<ids::ProjectId> for ProjectId {
@@ -285,3 +297,8 @@ impl From<ThreadMessageId> for ids::ThreadMessageId {
ids::ThreadMessageId(id.0 as u64)
}
}
impl From<SessionId> for ids::SessionId {
fn from(id: SessionId) -> Self {
ids::SessionId(id.0 as u64)
}
}

View File

@@ -5,6 +5,7 @@ pub mod ids;
pub mod notification_item;
pub mod project_item;
pub mod report_item;
pub mod session_item;
pub mod team_item;
pub mod thread_item;
pub mod user_item;
@@ -21,11 +22,13 @@ pub use version_item::Version;
#[derive(Error, Debug)]
pub enum DatabaseError {
#[error("Error while interacting with the database: {0}")]
Database(#[from] sqlx::error::Error),
Database(#[from] sqlx::Error),
#[error("Error while trying to generate random ID")]
RandomId,
#[error("A database request failed")]
Other(String),
#[error("Error while parsing JSON: {0}")]
Json(#[from] serde_json::Error),
#[error("Error while interacting with the cache: {0}")]
CacheError(#[from] redis::RedisError),
#[error("Redis Pool Error: {0}")]
RedisPool(#[from] deadpool_redis::PoolError),
#[error("Error while serializing with the cache: {0}")]
SerdeCacheError(#[from] serde_json::Error),
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,312 @@
use super::ids::*;
use crate::database::models::DatabaseError;
use crate::models::ids::base62_impl::{parse_base62, to_base62};
use chrono::{DateTime, Utc};
use redis::cmd;
use serde::{Deserialize, Serialize};
const SESSIONS_NAMESPACE: &str = "sessions";
const SESSIONS_IDS_NAMESPACE: &str = "sessions_ids";
const SESSIONS_USERS_NAMESPACE: &str = "sessions_users";
const DEFAULT_EXPIRY: i64 = 1800; // 30 minutes
// TODO: Manage sessions cache + clear cache when needed
pub struct SessionBuilder {
pub session: String,
pub user_id: UserId,
pub os: Option<String>,
pub platform: Option<String>,
pub city: Option<String>,
pub country: Option<String>,
pub ip: String,
pub user_agent: String,
}
impl SessionBuilder {
pub async fn insert(
&self,
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<SessionId, DatabaseError> {
let id = generate_session_id(&mut *transaction).await?;
sqlx::query!(
"
INSERT INTO sessions (
id, session, user_id, os, platform,
city, country, ip, user_agent
)
VALUES (
$1, $2, $3, $4, $5,
$6, $7, $8, $9
)
",
id as SessionId,
self.session,
self.user_id as UserId,
self.os,
self.platform,
self.city,
self.country,
self.ip,
self.user_agent,
)
.execute(&mut *transaction)
.await?;
Ok(id)
}
}
#[derive(Deserialize, Serialize)]
pub struct Session {
pub id: SessionId,
pub session: String,
pub user_id: UserId,
pub created: DateTime<Utc>,
pub last_login: DateTime<Utc>,
pub expires: DateTime<Utc>,
pub refresh_expires: DateTime<Utc>,
pub os: Option<String>,
pub platform: Option<String>,
pub user_agent: String,
pub city: Option<String>,
pub country: Option<String>,
pub ip: String,
}
impl Session {
pub async fn get<'a, E, T: ToString>(
id: T,
exec: E,
redis: &deadpool_redis::Pool,
) -> Result<Option<Session>, DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
Self::get_many(&[id], exec, redis)
.await
.map(|x| x.into_iter().next())
}
pub async fn get_id<'a, 'b, E>(
id: SessionId,
executor: E,
redis: &deadpool_redis::Pool,
) -> Result<Option<Session>, DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
Session::get_many(&[crate::models::ids::SessionId::from(id)], executor, redis)
.await
.map(|x| x.into_iter().next())
}
pub async fn get_many_ids<'a, E>(
user_ids: &[SessionId],
exec: E,
redis: &deadpool_redis::Pool,
) -> Result<Vec<Session>, DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
let ids = user_ids
.iter()
.map(|x| crate::models::ids::SessionId::from(*x))
.collect::<Vec<_>>();
Session::get_many(&ids, exec, redis).await
}
pub async fn get_many<'a, E, T: ToString>(
session_strings: &[T],
exec: E,
redis: &deadpool_redis::Pool,
) -> Result<Vec<Session>, DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
use futures::TryStreamExt;
if session_strings.is_empty() {
return Ok(Vec::new());
}
let mut redis = redis.get().await?;
let mut found_sessions = Vec::new();
let mut remaining_strings = session_strings
.iter()
.map(|x| x.to_string())
.collect::<Vec<_>>();
let mut session_ids = session_strings
.iter()
.flat_map(|x| parse_base62(&x.to_string()).map(|x| x as i64))
.collect::<Vec<_>>();
session_ids.append(
&mut cmd("MGET")
.arg(
session_strings
.iter()
.map(|x| format!("{}:{}", SESSIONS_IDS_NAMESPACE, x.to_string()))
.collect::<Vec<_>>(),
)
.query_async::<_, Vec<Option<i64>>>(&mut redis)
.await?
.into_iter()
.flatten()
.collect(),
);
if !session_ids.is_empty() {
let sessions = cmd("MGET")
.arg(
session_ids
.iter()
.map(|x| format!("{}:{}", SESSIONS_NAMESPACE, x))
.collect::<Vec<_>>(),
)
.query_async::<_, Vec<Option<String>>>(&mut redis)
.await?;
for session in sessions {
if let Some(session) =
session.and_then(|x| serde_json::from_str::<Session>(&x).ok())
{
remaining_strings
.retain(|x| &to_base62(session.id.0 as u64) != x && &session.session != x);
found_sessions.push(session);
continue;
}
}
}
if !remaining_strings.is_empty() {
let session_ids_parsed: Vec<i64> = session_strings
.iter()
.flat_map(|x| parse_base62(&x.to_string()).ok())
.map(|x| x as i64)
.collect();
let db_sessions: Vec<Session> = sqlx::query!(
"
SELECT id, user_id, session, created, last_login, expires, refresh_expires, os, platform,
city, country, ip, user_agent
FROM sessions
WHERE id = ANY($1) OR session = ANY($2)
ORDER BY created DESC
",
&session_ids_parsed,
&session_strings.into_iter().map(|x| x.to_string()).collect::<Vec<_>>(),
)
.fetch_many(exec)
.try_filter_map(|e| async {
Ok(e.right().map(|x| Session {
id: SessionId(x.id),
session: x.session,
user_id: UserId(x.user_id),
created: x.created,
last_login: x.last_login,
expires: x.expires,
refresh_expires: x.refresh_expires,
os: x.os,
platform: x.platform,
city: x.city,
country: x.country,
ip: x.ip,
user_agent: x.user_agent,
}))
})
.try_collect::<Vec<Session>>()
.await?;
for session in db_sessions {
cmd("SET")
.arg(format!("{}:{}", SESSIONS_NAMESPACE, session.id.0))
.arg(serde_json::to_string(&session)?)
.arg("EX")
.arg(DEFAULT_EXPIRY)
.query_async::<_, ()>(&mut redis)
.await?;
cmd("SET")
.arg(format!("{}:{}", SESSIONS_IDS_NAMESPACE, session.session))
.arg(session.id.0)
.arg("EX")
.arg(DEFAULT_EXPIRY)
.query_async::<_, ()>(&mut redis)
.await?;
found_sessions.push(session);
}
}
Ok(found_sessions)
}
pub async fn get_user_sessions<'a, E>(
user_id: UserId,
exec: E,
redis: &deadpool_redis::Pool,
) -> Result<Vec<SessionId>, DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
let mut redis = redis.get().await?;
let res = cmd("GET")
.arg(format!("{}:{}", SESSIONS_USERS_NAMESPACE, user_id.0))
.query_async::<_, Option<Vec<i64>>>(&mut redis)
.await?;
if let Some(res) = res {
return Ok(res.into_iter().map(SessionId).collect());
}
use futures::TryStreamExt;
let db_sessions: Vec<SessionId> = sqlx::query!(
"
SELECT id
FROM sessions
WHERE user_id = $1
ORDER BY created DESC
",
user_id.0,
)
.fetch_many(exec)
.try_filter_map(|e| async { Ok(e.right().map(|x| SessionId(x.id))) })
.try_collect::<Vec<SessionId>>()
.await?;
cmd("SET")
.arg(format!("{}:{}", SESSIONS_USERS_NAMESPACE, user_id.0))
.arg(serde_json::to_string(&db_sessions)?)
.arg("EX")
.arg(DEFAULT_EXPIRY)
.query_async::<_, ()>(&mut redis)
.await?;
Ok(db_sessions)
}
pub async fn remove(
id: SessionId,
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
// redis: &deadpool_redis::Pool,
) -> Result<Option<()>, sqlx::error::Error> {
sqlx::query!(
"
DELETE FROM sessions WHERE id = $1
",
id as SessionId,
)
.execute(&mut *transaction)
.await?;
Ok(Some(()))
}
}

View File

@@ -1,8 +1,14 @@
use super::ids::*;
use crate::database::models::User;
use crate::models::teams::Permissions;
use crate::models::users::{Badges, RecipientType, RecipientWallet};
use crate::models::users::Badges;
use itertools::Itertools;
use redis::cmd;
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
const TEAMS_NAMESPACE: &str = "teams";
const DEFAULT_EXPIRY: i64 = 1800;
pub struct TeamBuilder {
pub members: Vec<TeamMemberBuilder>,
@@ -90,6 +96,7 @@ pub struct TeamMember {
}
/// A member of a team
#[derive(Deserialize, Serialize)]
pub struct QueryTeamMember {
pub id: TeamMemberId,
pub team_id: TeamId,
@@ -107,81 +114,139 @@ impl TeamMember {
pub async fn get_from_team_full<'a, 'b, E>(
id: TeamId,
executor: E,
redis: &deadpool_redis::Pool,
) -> Result<Vec<QueryTeamMember>, super::DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
{
Self::get_from_team_full_many(&[id], executor).await
Self::get_from_team_full_many(&[id], executor, redis).await
}
pub async fn get_from_team_full_many<'a, E>(
team_ids: &[TeamId],
exec: E,
redis: &deadpool_redis::Pool,
) -> Result<Vec<QueryTeamMember>, super::DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
{
if team_ids.is_empty() {
return Ok(Vec::new());
}
use futures::stream::TryStreamExt;
let team_ids_parsed: Vec<i64> = team_ids.iter().map(|x| x.0).collect();
let mut team_ids_parsed: Vec<i64> = team_ids.iter().map(|x| x.0).collect();
let teams = sqlx::query!(
"
SELECT tm.id id, tm.team_id team_id, tm.role member_role, tm.permissions permissions, tm.accepted accepted, tm.payouts_split payouts_split, tm.ordering,
u.id user_id, u.name user_name, u.email email, u.kratos_id kratos_id, u.github_id github_id,
u.avatar_url avatar_url, u.username username, u.bio bio,
u.created created, u.role user_role, u.badges badges, u.balance balance,
u.payout_wallet payout_wallet, u.payout_wallet_type payout_wallet_type,
u.payout_address payout_address
FROM team_members tm
INNER JOIN users u ON u.id = tm.user_id
WHERE tm.team_id = ANY($1)
ORDER BY tm.team_id, tm.ordering
",
&team_ids_parsed
)
.fetch_many(exec)
.try_filter_map(|e| async {
if let Some(m) = e.right() {
let mut redis = redis.get().await?;
Ok(Some(Ok(QueryTeamMember {
id: TeamMemberId(m.id),
team_id: TeamId(m.team_id),
role: m.member_role,
permissions: Permissions::from_bits(m.permissions as u64).unwrap_or_default(),
accepted: m.accepted,
user: User {
id: UserId(m.user_id),
github_id: m.github_id,
kratos_id: m.kratos_id,
name: m.user_name,
email: m.email,
avatar_url: m.avatar_url,
username: m.username,
bio: m.bio,
created: m.created,
role: m.user_role,
badges: Badges::from_bits(m.badges as u64).unwrap_or_default(),
balance: m.balance,
payout_wallet: m.payout_wallet.map(|x| RecipientWallet::from_string(&x)),
payout_wallet_type: m.payout_wallet_type.map(|x| RecipientType::from_string(&x)),
payout_address: m.payout_address,
},
payouts_split: m.payouts_split,
ordering: m.ordering,
})))
} else {
Ok(None)
}
})
.try_collect::<Vec<Result<QueryTeamMember, super::DatabaseError>>>()
.await?;
let mut found_teams = Vec::new();
let team_members = teams
.into_iter()
.collect::<Result<Vec<QueryTeamMember>, super::DatabaseError>>()?;
let teams = cmd("MGET")
.arg(
team_ids_parsed
.iter()
.map(|x| format!("{}:{}", TEAMS_NAMESPACE, x))
.collect::<Vec<_>>(),
)
.query_async::<_, Vec<Option<String>>>(&mut redis)
.await?;
Ok(team_members)
for team_raw in teams {
if let Some(mut team) = team_raw
.clone()
.and_then(|x| serde_json::from_str::<Vec<QueryTeamMember>>(&x).ok())
{
if let Some(team_id) = team.first().map(|x| x.team_id) {
team_ids_parsed.retain(|x| &team_id.0 != x);
}
found_teams.append(&mut team);
continue;
}
}
if !team_ids_parsed.is_empty() {
let teams: Vec<QueryTeamMember> = sqlx::query!(
"
SELECT tm.id id, tm.team_id team_id, tm.role member_role, tm.permissions permissions, tm.accepted accepted, tm.payouts_split payouts_split, tm.ordering,
u.id user_id, u.name user_name,
u.avatar_url avatar_url, u.username username, u.bio bio,
u.created created, u.role user_role, u.badges badges
FROM team_members tm
INNER JOIN users u ON u.id = tm.user_id
WHERE tm.team_id = ANY($1)
ORDER BY tm.team_id, tm.ordering
",
&team_ids_parsed
)
.fetch_many(exec)
.try_filter_map(|e| async {
Ok(e.right().map(|m|
QueryTeamMember {
id: TeamMemberId(m.id),
team_id: TeamId(m.team_id),
role: m.member_role,
permissions: Permissions::from_bits(m.permissions as u64).unwrap_or_default(),
accepted: m.accepted,
user: User {
id: UserId(m.user_id),
github_id: None,
discord_id: None,
gitlab_id: None,
google_id: None,
steam_id: None,
name: m.user_name,
email: None,
avatar_url: m.avatar_url,
username: m.username,
bio: m.bio,
created: m.created,
role: m.user_role,
badges: Badges::from_bits(m.badges as u64).unwrap_or_default(),
balance: Decimal::ZERO,
payout_wallet: None,
payout_wallet_type: None,
payout_address: None,
microsoft_id: None,
},
payouts_split: m.payouts_split,
ordering: m.ordering,
}
))
})
.try_collect::<Vec<QueryTeamMember>>()
.await?;
for (id, members) in &teams.into_iter().group_by(|x| x.team_id) {
let mut members = members.collect::<Vec<_>>();
cmd("SET")
.arg(format!("{}:{}", TEAMS_NAMESPACE, id.0))
.arg(serde_json::to_string(&members)?)
.arg("EX")
.arg(DEFAULT_EXPIRY)
.query_async::<_, ()>(&mut redis)
.await?;
found_teams.append(&mut members);
}
}
Ok(found_teams)
}
pub async fn clear_cache(
id: TeamId,
redis: &deadpool_redis::Pool,
) -> Result<(), super::DatabaseError> {
let mut redis = redis.get().await?;
cmd("DEL")
.arg(format!("{}:{}", TEAMS_NAMESPACE, id.0))
.query_async::<_, ()>(&mut redis)
.await?;
Ok(())
}
/// Gets a team member from a user id and team id. Does not return pending members.

View File

@@ -1,12 +1,28 @@
use super::ids::{ProjectId, UserId};
use crate::database::models::DatabaseError;
use crate::models::ids::base62_impl::{parse_base62, to_base62};
use crate::models::users::{Badges, RecipientType, RecipientWallet};
use chrono::{DateTime, Utc};
use redis::cmd;
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
const USERS_NAMESPACE: &str = "users";
const USER_USERNAMES_NAMESPACE: &str = "users_usernames";
// const USERS_PROJECTS_NAMESPACE: &str = "users_projects";
const DEFAULT_EXPIRY: i64 = 1800; // 30 minutes
#[derive(Deserialize, Serialize)]
pub struct User {
pub id: UserId,
pub kratos_id: Option<String>, // None if legacy user unconnected to Minos/Kratos
pub github_id: Option<i64>,
pub discord_id: Option<i64>,
pub gitlab_id: Option<i64>,
pub google_id: Option<String>,
pub steam_id: Option<i64>,
pub microsoft_id: Option<String>,
pub username: String,
pub name: Option<String>,
pub email: Option<String>,
@@ -29,22 +45,29 @@ impl User {
sqlx::query!(
"
INSERT INTO users (
id, kratos_id, username, name, email,
avatar_url, bio, created
id, username, name, email,
avatar_url, bio, created,
github_id, discord_id, gitlab_id, google_id, steam_id, microsoft_id
)
VALUES (
$1, $2, $3, $4, $5,
$6, $7, $8
$6, $7,
$8, $9, $10, $11, $12, $13
)
",
self.id as UserId,
self.kratos_id,
&self.username,
self.name.as_ref(),
self.email.as_ref(),
self.avatar_url.as_ref(),
self.bio.as_ref(),
self.created,
self.github_id,
self.discord_id,
self.gitlab_id,
self.google_id,
self.steam_id,
self.microsoft_id,
)
.execute(&mut *transaction)
.await?;
@@ -52,199 +75,192 @@ impl User {
Ok(())
}
pub async fn get<'a, 'b, E>(id: UserId, executor: E) -> Result<Option<Self>, sqlx::error::Error>
pub async fn get<'a, 'b, E>(
string: &str,
executor: E,
redis: &deadpool_redis::Pool,
) -> Result<Option<User>, DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
Self::get_many(&[id], executor)
User::get_many(&[string], executor, redis)
.await
.map(|x| x.into_iter().next())
}
pub async fn get_from_github_id<'a, 'b, E>(
github_id: u64,
pub async fn get_id<'a, 'b, E>(
id: UserId,
executor: E,
) -> Result<Option<Self>, sqlx::error::Error>
redis: &deadpool_redis::Pool,
) -> Result<Option<User>, DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
let result = sqlx::query!(
"
SELECT u.id, u.name, u.email, u.kratos_id,
u.avatar_url, u.username, u.bio,
u.created, u.role, u.badges,
u.balance, u.payout_wallet, u.payout_wallet_type,
u.payout_address
FROM users u
WHERE u.github_id = $1
",
github_id as i64,
)
.fetch_optional(executor)
.await?;
if let Some(row) = result {
Ok(Some(User {
id: UserId(row.id),
github_id: Some(github_id as i64),
name: row.name,
email: row.email,
kratos_id: row.kratos_id,
avatar_url: row.avatar_url,
username: row.username,
bio: row.bio,
created: row.created,
role: row.role,
badges: Badges::from_bits(row.badges as u64).unwrap_or_default(),
balance: row.balance,
payout_wallet: row.payout_wallet.map(|x| RecipientWallet::from_string(&x)),
payout_wallet_type: row
.payout_wallet_type
.map(|x| RecipientType::from_string(&x)),
payout_address: row.payout_address,
}))
} else {
Ok(None)
}
User::get_many(&[crate::models::ids::UserId::from(id)], executor, redis)
.await
.map(|x| x.into_iter().next())
}
pub async fn get_from_minos_kratos_id<'a, 'b, E>(
kratos_id: String,
executor: E,
) -> Result<Option<Self>, sqlx::error::Error>
pub async fn get_many_ids<'a, E>(
user_ids: &[UserId],
exec: E,
redis: &deadpool_redis::Pool,
) -> Result<Vec<User>, DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
let result = sqlx::query!(
"
SELECT u.id, u.name, u.kratos_id, u.email, u.github_id,
u.avatar_url, u.username, u.bio,
u.created, u.role, u.badges,
u.balance, u.payout_wallet, u.payout_wallet_type,
u.payout_address
FROM users u
WHERE u.kratos_id = $1
",
kratos_id as String,
)
.fetch_optional(executor)
.await?;
if let Some(row) = result {
Ok(Some(User {
id: UserId(row.id),
kratos_id: row.kratos_id,
github_id: row.github_id,
name: row.name,
email: row.email,
avatar_url: row.avatar_url,
username: row.username,
bio: row.bio,
created: row.created,
role: row.role,
badges: Badges::from_bits(row.badges as u64).unwrap_or_default(),
balance: row.balance,
payout_wallet: row.payout_wallet.map(|x| RecipientWallet::from_string(&x)),
payout_wallet_type: row
.payout_wallet_type
.map(|x| RecipientType::from_string(&x)),
payout_address: row.payout_address,
}))
} else {
Ok(None)
}
let ids = user_ids
.iter()
.map(|x| crate::models::ids::UserId::from(*x))
.collect::<Vec<_>>();
User::get_many(&ids, exec, redis).await
}
pub async fn get_from_username<'a, 'b, E>(
username: String,
executor: E,
) -> Result<Option<Self>, sqlx::error::Error>
pub async fn get_many<'a, E, T: ToString>(
users_strings: &[T],
exec: E,
redis: &deadpool_redis::Pool,
) -> Result<Vec<User>, DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
let result = sqlx::query!(
"
SELECT u.id, u.kratos_id, u.name, u.email, u.github_id,
u.avatar_url, u.username, u.bio,
u.created, u.role, u.badges,
u.balance, u.payout_wallet, u.payout_wallet_type,
u.payout_address
FROM users u
WHERE LOWER(u.username) = LOWER($1)
",
username
)
.fetch_optional(executor)
.await?;
use futures::TryStreamExt;
if let Some(row) = result {
Ok(Some(User {
id: UserId(row.id),
kratos_id: row.kratos_id,
github_id: row.github_id,
name: row.name,
email: row.email,
avatar_url: row.avatar_url,
username: row.username,
bio: row.bio,
created: row.created,
role: row.role,
badges: Badges::from_bits(row.badges as u64).unwrap_or_default(),
balance: row.balance,
payout_wallet: row.payout_wallet.map(|x| RecipientWallet::from_string(&x)),
payout_wallet_type: row
.payout_wallet_type
.map(|x| RecipientType::from_string(&x)),
payout_address: row.payout_address,
}))
} else {
Ok(None)
if users_strings.is_empty() {
return Ok(Vec::new());
}
}
pub async fn get_many<'a, E>(user_ids: &[UserId], exec: E) -> Result<Vec<User>, sqlx::Error>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
{
use futures::stream::TryStreamExt;
let mut redis = redis.get().await?;
let user_ids_parsed: Vec<i64> = user_ids.iter().map(|x| x.0).collect();
let users = sqlx::query!(
"
SELECT u.id, u.kratos_id, u.name, u.email, u.github_id,
u.avatar_url, u.username, u.bio,
u.created, u.role, u.badges,
u.balance, u.payout_wallet, u.payout_wallet_type,
u.payout_address
FROM users u
WHERE u.id = ANY($1)
",
&user_ids_parsed
)
.fetch_many(exec)
.try_filter_map(|e| async {
Ok(e.right().map(|u| User {
id: UserId(u.id),
kratos_id: u.kratos_id,
github_id: u.github_id,
name: u.name,
email: u.email,
avatar_url: u.avatar_url,
username: u.username,
bio: u.bio,
created: u.created,
role: u.role,
badges: Badges::from_bits(u.badges as u64).unwrap_or_default(),
balance: u.balance,
payout_wallet: u.payout_wallet.map(|x| RecipientWallet::from_string(&x)),
payout_wallet_type: u.payout_wallet_type.map(|x| RecipientType::from_string(&x)),
payout_address: u.payout_address,
}))
})
.try_collect::<Vec<User>>()
.await?;
let mut found_users = Vec::new();
let mut remaining_strings = users_strings
.iter()
.map(|x| x.to_string())
.collect::<Vec<_>>();
Ok(users)
let mut user_ids = users_strings
.iter()
.flat_map(|x| parse_base62(&x.to_string()).map(|x| x as i64))
.collect::<Vec<_>>();
user_ids.append(
&mut cmd("MGET")
.arg(
users_strings
.iter()
.map(|x| {
format!(
"{}:{}",
USER_USERNAMES_NAMESPACE,
x.to_string().to_lowercase()
)
})
.collect::<Vec<_>>(),
)
.query_async::<_, Vec<Option<i64>>>(&mut redis)
.await?
.into_iter()
.flatten()
.collect(),
);
if !user_ids.is_empty() {
let users = cmd("MGET")
.arg(
user_ids
.iter()
.map(|x| format!("{}:{}", USERS_NAMESPACE, x))
.collect::<Vec<_>>(),
)
.query_async::<_, Vec<Option<String>>>(&mut redis)
.await?;
for user in users {
if let Some(user) = user.and_then(|x| serde_json::from_str::<User>(&x).ok()) {
remaining_strings
.retain(|x| &to_base62(user.id.0 as u64) != x && &user.username != x);
found_users.push(user);
continue;
}
}
}
if !remaining_strings.is_empty() {
let user_ids_parsed: Vec<i64> = remaining_strings
.iter()
.flat_map(|x| parse_base62(&x.to_string()).ok())
.map(|x| x as i64)
.collect();
let db_users: Vec<User> = sqlx::query!(
"
SELECT id, name, email,
avatar_url, username, bio,
created, role, badges,
balance, payout_wallet, payout_wallet_type, payout_address,
github_id, discord_id, gitlab_id, google_id, steam_id, microsoft_id
FROM users
WHERE id = ANY($1) OR username = ANY($2)
",
&user_ids_parsed,
&remaining_strings
.into_iter()
.map(|x| x.to_string().to_lowercase())
.collect::<Vec<_>>(),
)
.fetch_many(exec)
.try_filter_map(|e| async {
Ok(e.right().map(|u| User {
id: UserId(u.id),
github_id: u.github_id,
discord_id: u.discord_id,
gitlab_id: u.gitlab_id,
google_id: u.google_id,
steam_id: u.steam_id,
microsoft_id: u.microsoft_id,
name: u.name,
email: u.email,
avatar_url: u.avatar_url,
username: u.username,
bio: u.bio,
created: u.created,
role: u.role,
badges: Badges::from_bits(u.badges as u64).unwrap_or_default(),
balance: u.balance,
payout_wallet: u.payout_wallet.map(|x| RecipientWallet::from_string(&x)),
payout_wallet_type: u
.payout_wallet_type
.map(|x| RecipientType::from_string(&x)),
payout_address: u.payout_address,
}))
})
.try_collect::<Vec<User>>()
.await?;
for user in db_users {
cmd("SET")
.arg(format!("{}:{}", USERS_NAMESPACE, user.id.0))
.arg(serde_json::to_string(&user)?)
.arg("EX")
.arg(DEFAULT_EXPIRY)
.query_async::<_, ()>(&mut redis)
.await?;
cmd("SET")
.arg(format!(
"{}:{}",
USER_USERNAMES_NAMESPACE,
user.username.to_lowercase()
))
.arg(user.id.0)
.arg("EX")
.arg(DEFAULT_EXPIRY)
.query_async::<_, ()>(&mut redis)
.await?;
found_users.push(user);
}
}
Ok(found_users)
}
pub async fn get_projects<'a, E>(
@@ -273,321 +289,207 @@ impl User {
Ok(projects)
}
pub async fn remove(
id: UserId,
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<Option<()>, sqlx::error::Error> {
let deleted_user: UserId = crate::models::users::DELETED_USER.into();
pub async fn clear_caches(
user_ids: &[(UserId, Option<String>)],
redis: &deadpool_redis::Pool,
) -> Result<(), DatabaseError> {
let mut redis = redis.get().await?;
let mut cmd = cmd("DEL");
sqlx::query!(
"
UPDATE team_members
SET user_id = $1
WHERE (user_id = $2 AND role = $3)
",
deleted_user as UserId,
id as UserId,
crate::models::teams::OWNER_ROLE
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
UPDATE versions
SET author_id = $1
WHERE (author_id = $2)
",
deleted_user as UserId,
id as UserId,
)
.execute(&mut *transaction)
.await?;
use futures::TryStreamExt;
let notifications: Vec<i64> = sqlx::query!(
"
SELECT n.id FROM notifications n
WHERE n.user_id = $1
",
id as UserId,
)
.fetch_many(&mut *transaction)
.try_filter_map(|e| async { Ok(e.right().map(|m| m.id)) })
.try_collect::<Vec<i64>>()
.await?;
sqlx::query!(
"
DELETE FROM notifications
WHERE user_id = $1
",
id as UserId,
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
DELETE FROM reports
WHERE user_id = $1
",
id as UserId,
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
DELETE FROM mod_follows
WHERE follower_id = $1
",
id as UserId,
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
DELETE FROM notifications_actions
WHERE notification_id = ANY($1)
",
&notifications
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
DELETE FROM team_members
WHERE user_id = $1
",
id as UserId,
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
DELETE FROM payouts_values
WHERE user_id = $1
",
id as UserId,
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
DELETE FROM historical_payouts
WHERE user_id = $1
",
id as UserId,
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
DELETE FROM users
WHERE id = $1
",
id as UserId,
)
.execute(&mut *transaction)
.await?;
Ok(Some(()))
}
pub async fn remove_full(
id: UserId,
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<Option<()>, sqlx::error::Error> {
use futures::TryStreamExt;
let projects: Vec<ProjectId> = sqlx::query!(
"
SELECT m.id FROM mods m
INNER JOIN team_members tm ON tm.team_id = m.team_id
WHERE tm.user_id = $1 AND tm.role = $2
",
id as UserId,
crate::models::teams::OWNER_ROLE
)
.fetch_many(&mut *transaction)
.try_filter_map(|e| async { Ok(e.right().map(|m| ProjectId(m.id))) })
.try_collect::<Vec<ProjectId>>()
.await?;
for project_id in projects {
let _result =
super::project_item::Project::remove_full(project_id, transaction).await?;
}
let notifications: Vec<i64> = sqlx::query!(
"
SELECT n.id FROM notifications n
WHERE n.user_id = $1
",
id as UserId,
)
.fetch_many(&mut *transaction)
.try_filter_map(|e| async { Ok(e.right().map(|m| m.id)) })
.try_collect::<Vec<i64>>()
.await?;
sqlx::query!(
"
DELETE FROM notifications
WHERE user_id = $1
",
id as UserId,
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
DELETE FROM notifications_actions
WHERE notification_id = ANY($1)
",
&notifications
)
.execute(&mut *transaction)
.await?;
let deleted_user: UserId = crate::models::users::DELETED_USER.into();
sqlx::query!(
"
UPDATE versions
SET author_id = $1
WHERE (author_id = $2)
",
deleted_user as UserId,
id as UserId,
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
DELETE FROM team_members
WHERE user_id = $1
",
id as UserId,
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
r#"
UPDATE threads_messages
SET body = '{"type": "deleted"}', author_id = $2
WHERE author_id = $1
"#,
id as UserId,
deleted_user as UserId,
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
DELETE FROM threads_members
WHERE user_id = $1
",
id as UserId,
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
DELETE FROM users
WHERE id = $1
",
id as UserId,
)
.execute(&mut *transaction)
.await?;
Ok(Some(()))
}
pub async fn get_id_from_username_or_id<'a, 'b, E>(
username_or_id: &str,
executor: E,
) -> Result<Option<UserId>, sqlx::error::Error>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
{
let id_option = crate::models::ids::base62_impl::parse_base62(username_or_id).ok();
if let Some(id) = id_option {
let id = UserId(id as i64);
let mut user_id = sqlx::query!(
"
SELECT id FROM users
WHERE id = $1
",
id as UserId
)
.fetch_optional(executor)
.await?
.map(|x| UserId(x.id));
if user_id.is_none() {
user_id = sqlx::query!(
"
SELECT id FROM users
WHERE LOWER(username) = LOWER($1)
",
username_or_id
)
.fetch_optional(executor)
.await?
.map(|x| UserId(x.id));
for (id, username) in user_ids {
cmd.arg(format!("{}:{}", USERS_NAMESPACE, id.0));
if let Some(username) = username {
cmd.arg(format!(
"{}:{}",
USER_USERNAMES_NAMESPACE,
username.to_lowercase()
));
}
Ok(user_id)
} else {
let id = sqlx::query!(
"
SELECT id FROM users
WHERE LOWER(username) = LOWER($1)
",
username_or_id
)
.fetch_optional(executor)
.await?;
Ok(id.map(|x| UserId(x.id)))
}
}
pub async fn merge_minos_user<'a, 'b, E>(
&self,
kratos_id: &str,
executor: E,
) -> Result<(), sqlx::error::Error>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
// If the user exists, link the Minos user into the existing user rather tham create a new one
sqlx::query!(
"
UPDATE users
SET kratos_id = $1
WHERE (id = $2)
",
kratos_id,
self.id.0,
)
.execute(executor)
.await?;
cmd.query_async::<_, ()>(&mut redis).await?;
Ok(())
}
pub async fn remove(
id: UserId,
full: bool,
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
redis: &deadpool_redis::Pool,
) -> Result<Option<()>, DatabaseError> {
let user = Self::get_id(id, &mut *transaction, redis).await?;
if let Some(delete_user) = user {
User::clear_caches(&[(id, Some(delete_user.username))], redis).await?;
let deleted_user: UserId = crate::models::users::DELETED_USER.into();
if full {
let projects: Vec<ProjectId> = sqlx::query!(
"
SELECT m.id FROM mods m
INNER JOIN team_members tm ON tm.team_id = m.team_id
WHERE tm.user_id = $1 AND tm.role = $2
",
id as UserId,
crate::models::teams::OWNER_ROLE
)
.fetch_many(&mut *transaction)
.try_filter_map(|e| async { Ok(e.right().map(|m| ProjectId(m.id))) })
.try_collect::<Vec<ProjectId>>()
.await?;
for project_id in projects {
let _result =
super::project_item::Project::remove(project_id, transaction, redis)
.await?;
}
} else {
sqlx::query!(
"
UPDATE team_members
SET user_id = $1
WHERE (user_id = $2 AND role = $3)
",
deleted_user as UserId,
id as UserId,
crate::models::teams::OWNER_ROLE
)
.execute(&mut *transaction)
.await?;
}
sqlx::query!(
"
UPDATE versions
SET author_id = $1
WHERE (author_id = $2)
",
deleted_user as UserId,
id as UserId,
)
.execute(&mut *transaction)
.await?;
use futures::TryStreamExt;
let notifications: Vec<i64> = sqlx::query!(
"
SELECT n.id FROM notifications n
WHERE n.user_id = $1
",
id as UserId,
)
.fetch_many(&mut *transaction)
.try_filter_map(|e| async { Ok(e.right().map(|m| m.id)) })
.try_collect::<Vec<i64>>()
.await?;
sqlx::query!(
"
DELETE FROM notifications
WHERE user_id = $1
",
id as UserId,
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
DELETE FROM notifications_actions
WHERE notification_id = ANY($1)
",
&notifications
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
DELETE FROM reports
WHERE user_id = $1
",
id as UserId,
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
DELETE FROM mod_follows
WHERE follower_id = $1
",
id as UserId,
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
DELETE FROM team_members
WHERE user_id = $1
",
id as UserId,
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
DELETE FROM payouts_values
WHERE user_id = $1
",
id as UserId,
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
DELETE FROM historical_payouts
WHERE user_id = $1
",
id as UserId,
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
r#"
UPDATE threads_messages
SET body = '{"type": "deleted"}', author_id = $2
WHERE author_id = $1
"#,
id as UserId,
deleted_user as UserId,
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
DELETE FROM threads_members
WHERE user_id = $1
",
id as UserId,
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
DELETE FROM users
WHERE id = $1
",
id as UserId,
)
.execute(&mut *transaction)
.await?;
Ok(Some(()))
} else {
Ok(None)
}
}
}

View File

@@ -1,12 +1,20 @@
use super::ids::*;
use super::DatabaseError;
use crate::models::ids::base62_impl::parse_base62;
use crate::models::projects::{FileType, VersionStatus, VersionType};
use crate::models::projects::{FileType, VersionStatus};
use chrono::{DateTime, Utc};
use serde::Deserialize;
use itertools::Itertools;
use redis::cmd;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::collections::HashMap;
const VERSIONS_NAMESPACE: &str = "versions";
// TODO: Cache version slugs call
// const VERSIONS_SLUGS_NAMESPACE: &str = "versions_slugs";
const VERSION_FILES_NAMESPACE: &str = "versions_files";
const DEFAULT_EXPIRY: i64 = 1800; // 30 minutes
pub struct VersionBuilder {
pub version_id: VersionId,
pub project_id: ProjectId,
@@ -199,7 +207,7 @@ impl VersionBuilder {
}
}
#[derive(Clone)]
#[derive(Clone, Deserialize, Serialize)]
pub struct Version {
pub id: VersionId,
pub project_id: ProjectId,
@@ -254,20 +262,18 @@ impl Version {
pub async fn remove_full(
id: VersionId,
redis: &deadpool_redis::Pool,
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<Option<()>, sqlx::Error> {
let result = sqlx::query!(
"
SELECT EXISTS(SELECT 1 FROM versions WHERE id = $1)
",
id as VersionId,
)
.fetch_one(&mut *transaction)
.await?;
) -> Result<Option<()>, DatabaseError> {
let result = Self::get(id, &mut *transaction, redis).await?;
if !result.exists.unwrap_or(false) {
let result = if let Some(result) = result {
result
} else {
return Ok(None);
}
};
Version::clear_cache(&result, redis).await?;
sqlx::query!(
"
@@ -374,276 +380,383 @@ impl Version {
.execute(&mut *transaction)
.await?;
crate::database::models::Project::update_game_versions(
ProjectId(project_id.mod_id),
&mut *transaction,
)
.await?;
crate::database::models::Project::update_loaders(
ProjectId(project_id.mod_id),
&mut *transaction,
)
.await?;
Ok(Some(()))
}
pub async fn get_project_versions<'a, E>(
project_id: ProjectId,
game_versions: Option<Vec<String>>,
loaders: Option<Vec<String>>,
version_type: Option<VersionType>,
limit: Option<u32>,
offset: Option<u32>,
exec: E,
) -> Result<Vec<VersionId>, sqlx::Error>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
use futures::stream::TryStreamExt;
let vec = sqlx::query!(
"
SELECT DISTINCT ON(v.date_published, v.id) version_id, v.date_published FROM versions v
INNER JOIN game_versions_versions gvv ON gvv.joining_version_id = v.id
INNER JOIN game_versions gv on gvv.game_version_id = gv.id AND (cardinality($2::varchar[]) = 0 OR gv.version = ANY($2::varchar[]))
INNER JOIN loaders_versions lv ON lv.version_id = v.id
INNER JOIN loaders l on lv.loader_id = l.id AND (cardinality($3::varchar[]) = 0 OR l.loader = ANY($3::varchar[]))
WHERE v.mod_id = $1 AND ($4::varchar IS NULL OR v.version_type = $4)
ORDER BY v.date_published DESC, v.id
LIMIT $5 OFFSET $6
",
project_id as ProjectId,
&game_versions.unwrap_or_default(),
&loaders.unwrap_or_default(),
version_type.map(|x| x.as_str()),
limit.map(|x| x as i64),
offset.map(|x| x as i64),
)
.fetch_many(exec)
.try_filter_map(|e| async { Ok(e.right().map(|v| VersionId(v.version_id))) })
.try_collect::<Vec<VersionId>>()
.await?;
Ok(vec)
}
pub async fn get_projects_versions<'a, E>(
project_ids: Vec<ProjectId>,
game_versions: Option<Vec<String>>,
loaders: Option<Vec<String>>,
version_type: Option<VersionType>,
limit: Option<u32>,
offset: Option<u32>,
exec: E,
) -> Result<HashMap<ProjectId, Vec<VersionId>>, sqlx::Error>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
use futures::stream::TryStreamExt;
let vec = sqlx::query!(
"
SELECT DISTINCT ON(v.date_published, v.id) version_id, v.mod_id, v.date_published FROM versions v
INNER JOIN game_versions_versions gvv ON gvv.joining_version_id = v.id
INNER JOIN game_versions gv on gvv.game_version_id = gv.id AND (cardinality($2::varchar[]) = 0 OR gv.version = ANY($2::varchar[]))
INNER JOIN loaders_versions lv ON lv.version_id = v.id
INNER JOIN loaders l on lv.loader_id = l.id AND (cardinality($3::varchar[]) = 0 OR l.loader = ANY($3::varchar[]))
WHERE v.mod_id = ANY($1) AND ($4::varchar IS NULL OR v.version_type = $4)
ORDER BY v.date_published, v.id ASC
LIMIT $5 OFFSET $6
",
&project_ids.into_iter().map(|x| x.0).collect::<Vec<i64>>(),
&game_versions.unwrap_or_default(),
&loaders.unwrap_or_default(),
version_type.map(|x| x.as_str()),
limit.map(|x| x as i64),
offset.map(|x| x as i64),
)
.fetch_many(exec)
.try_filter_map(|e| async { Ok(e.right().map(|v| (ProjectId(v.mod_id), VersionId(v.version_id)))) })
.try_collect::<Vec<(ProjectId, VersionId)>>()
.await?;
let mut map: HashMap<ProjectId, Vec<VersionId>> = HashMap::new();
for (project_id, version_id) in vec {
if let Some(value) = map.get_mut(&project_id) {
value.push(version_id);
} else {
map.insert(project_id, vec![version_id]);
}
}
Ok(map)
}
pub async fn get_full<'a, 'b, E>(
pub async fn get<'a, 'b, E>(
id: VersionId,
executor: E,
) -> Result<Option<QueryVersion>, sqlx::error::Error>
redis: &deadpool_redis::Pool,
) -> Result<Option<QueryVersion>, DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
Self::get_many_full(&[id], executor)
Self::get_many(&[id], executor, redis)
.await
.map(|x| x.into_iter().next())
}
pub async fn get_many_full<'a, E>(
pub async fn get_many<'a, E>(
version_ids: &[VersionId],
exec: E,
) -> Result<Vec<QueryVersion>, sqlx::Error>
redis: &deadpool_redis::Pool,
) -> Result<Vec<QueryVersion>, DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
if version_ids.is_empty() {
return Ok(Vec::new());
}
use futures::stream::TryStreamExt;
let mut version_ids_parsed: Vec<i64> = version_ids.iter().map(|x| x.0).collect();
let mut redis = redis.get().await?;
let mut found_versions = Vec::new();
let versions = cmd("MGET")
.arg(
version_ids_parsed
.iter()
.map(|x| format!("{}:{}", VERSIONS_NAMESPACE, x))
.collect::<Vec<_>>(),
)
.query_async::<_, Vec<Option<String>>>(&mut redis)
.await?;
for version in versions {
if let Some(version) =
version.and_then(|x| serde_json::from_str::<QueryVersion>(&x).ok())
{
version_ids_parsed.retain(|x| &version.inner.id.0 != x);
found_versions.push(version);
continue;
}
}
if !version_ids_parsed.is_empty() {
let db_versions: Vec<QueryVersion> = sqlx::query!(
"
SELECT v.id id, v.mod_id mod_id, v.author_id author_id, v.name version_name, v.version_number version_number,
v.changelog changelog, v.date_published date_published, v.downloads downloads,
v.version_type version_type, v.featured featured, v.status status, v.requested_status requested_status,
JSONB_AGG(DISTINCT jsonb_build_object('version', gv.version, 'created', gv.created)) filter (where gv.version is not null) game_versions,
ARRAY_AGG(DISTINCT l.loader) filter (where l.loader is not null) loaders,
JSONB_AGG(DISTINCT jsonb_build_object('id', f.id, 'url', f.url, 'filename', f.filename, 'primary', f.is_primary, 'size', f.size, 'file_type', f.file_type)) filter (where f.id is not null) files,
JSONB_AGG(DISTINCT jsonb_build_object('algorithm', h.algorithm, 'hash', encode(h.hash, 'escape'), 'file_id', h.file_id)) filter (where h.hash is not null) hashes,
JSONB_AGG(DISTINCT jsonb_build_object('project_id', d.mod_dependency_id, 'version_id', d.dependency_id, 'dependency_type', d.dependency_type,'file_name', dependency_file_name)) filter (where d.dependency_type is not null) dependencies
FROM versions v
LEFT OUTER JOIN game_versions_versions gvv on v.id = gvv.joining_version_id
LEFT OUTER JOIN game_versions gv on gvv.game_version_id = gv.id
LEFT OUTER JOIN loaders_versions lv on v.id = lv.version_id
LEFT OUTER JOIN loaders l on lv.loader_id = l.id
LEFT OUTER JOIN files f on v.id = f.version_id
LEFT OUTER JOIN hashes h on f.id = h.file_id
LEFT OUTER JOIN dependencies d on v.id = d.dependent_id
WHERE v.id = ANY($1)
GROUP BY v.id
ORDER BY v.date_published ASC;
",
&version_ids_parsed
)
.fetch_many(exec)
.try_filter_map(|e| async {
Ok(e.right().map(|v|
QueryVersion {
inner: Version {
id: VersionId(v.id),
project_id: ProjectId(v.mod_id),
author_id: UserId(v.author_id),
name: v.version_name,
version_number: v.version_number,
changelog: v.changelog,
changelog_url: None,
date_published: v.date_published,
downloads: v.downloads,
version_type: v.version_type,
featured: v.featured,
status: VersionStatus::from_str(&v.status),
requested_status: v.requested_status
.map(|x| VersionStatus::from_str(&x)),
},
files: {
#[derive(Deserialize)]
struct Hash {
pub file_id: FileId,
pub algorithm: String,
pub hash: String,
}
#[derive(Deserialize)]
struct File {
pub id: FileId,
pub url: String,
pub filename: String,
pub primary: bool,
pub size: u32,
pub file_type: Option<FileType>,
}
let hashes: Vec<Hash> = serde_json::from_value(
v.hashes.unwrap_or_default(),
)
.ok()
.unwrap_or_default();
let files: Vec<File> = serde_json::from_value(
v.files.unwrap_or_default(),
)
.ok()
.unwrap_or_default();
let mut files = files.into_iter().map(|x| {
let mut file_hashes = HashMap::new();
for hash in &hashes {
if hash.file_id == x.id {
file_hashes.insert(
hash.algorithm.clone(),
hash.hash.clone(),
);
}
}
QueryFile {
id: x.id,
url: x.url,
filename: x.filename,
hashes: file_hashes,
primary: x.primary,
size: x.size,
file_type: x.file_type,
}
}).collect::<Vec<_>>();
files.sort_by(|a, b| {
if a.primary {
Ordering::Less
} else if b.primary {
Ordering::Greater
} else {
a.filename.cmp(&b.filename)
}
});
files
},
game_versions: {
#[derive(Deserialize)]
struct GameVersion {
pub version: String,
pub created: DateTime<Utc>,
}
let mut game_versions: Vec<GameVersion> = serde_json::from_value(
v.game_versions.unwrap_or_default(),
)
.ok()
.unwrap_or_default();
game_versions.sort_by(|a, b| a.created.cmp(&b.created));
game_versions.into_iter().map(|x| x.version).collect()
},
loaders: v.loaders.unwrap_or_default(),
dependencies: serde_json::from_value(
v.dependencies.unwrap_or_default(),
)
.ok()
.unwrap_or_default(),
}
))
})
.try_collect::<Vec<QueryVersion>>()
.await?;
for version in db_versions {
cmd("SET")
.arg(format!("{}:{}", VERSIONS_NAMESPACE, version.inner.id.0))
.arg(serde_json::to_string(&version)?)
.arg("EX")
.arg(DEFAULT_EXPIRY)
.query_async::<_, ()>(&mut redis)
.await?;
found_versions.push(version);
}
}
Ok(found_versions)
}
pub async fn get_file_from_hash<'a, 'b, E>(
algo: String,
hash: String,
version_id: Option<VersionId>,
executor: E,
redis: &deadpool_redis::Pool,
) -> Result<Option<SingleFile>, DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
{
use futures::stream::TryStreamExt;
let version_ids_parsed: Vec<i64> = version_ids.iter().map(|x| x.0).collect();
sqlx::query!(
"
SELECT v.id id, v.mod_id mod_id, v.author_id author_id, v.name version_name, v.version_number version_number,
v.changelog changelog, v.date_published date_published, v.downloads downloads,
v.version_type version_type, v.featured featured, v.status status, v.requested_status requested_status,
JSONB_AGG(DISTINCT jsonb_build_object('version', gv.version, 'created', gv.created)) filter (where gv.version is not null) game_versions,
ARRAY_AGG(DISTINCT l.loader) filter (where l.loader is not null) loaders,
JSONB_AGG(DISTINCT jsonb_build_object('id', f.id, 'url', f.url, 'filename', f.filename, 'primary', f.is_primary, 'size', f.size, 'file_type', f.file_type)) filter (where f.id is not null) files,
JSONB_AGG(DISTINCT jsonb_build_object('algorithm', h.algorithm, 'hash', encode(h.hash, 'escape'), 'file_id', h.file_id)) filter (where h.hash is not null) hashes,
JSONB_AGG(DISTINCT jsonb_build_object('project_id', d.mod_dependency_id, 'version_id', d.dependency_id, 'dependency_type', d.dependency_type,'file_name', dependency_file_name)) filter (where d.dependency_type is not null) dependencies
FROM versions v
LEFT OUTER JOIN game_versions_versions gvv on v.id = gvv.joining_version_id
LEFT OUTER JOIN game_versions gv on gvv.game_version_id = gv.id
LEFT OUTER JOIN loaders_versions lv on v.id = lv.version_id
LEFT OUTER JOIN loaders l on lv.loader_id = l.id
LEFT OUTER JOIN files f on v.id = f.version_id
LEFT OUTER JOIN hashes h on f.id = h.file_id
LEFT OUTER JOIN dependencies d on v.id = d.dependent_id
WHERE v.id = ANY($1)
GROUP BY v.id
ORDER BY v.date_published ASC;
",
&version_ids_parsed
)
.fetch_many(exec)
.try_filter_map(|e| async {
Ok(e.right().map(|v|
QueryVersion {
inner: Version {
id: VersionId(v.id),
project_id: ProjectId(v.mod_id),
author_id: UserId(v.author_id),
name: v.version_name,
version_number: v.version_number,
changelog: v.changelog,
changelog_url: None,
date_published: v.date_published,
downloads: v.downloads,
version_type: v.version_type,
featured: v.featured,
status: VersionStatus::from_str(&v.status),
requested_status: v.requested_status
.map(|x| VersionStatus::from_str(&x)),
},
files: {
#[derive(Deserialize)]
struct Hash {
pub file_id: FileId,
pub algorithm: String,
pub hash: String,
}
#[derive(Deserialize)]
struct File {
pub id: FileId,
pub url: String,
pub filename: String,
pub primary: bool,
pub size: u32,
pub file_type: Option<FileType>,
}
let hashes: Vec<Hash> = serde_json::from_value(
v.hashes.unwrap_or_default(),
)
.ok()
.unwrap_or_default();
let files: Vec<File> = serde_json::from_value(
v.files.unwrap_or_default(),
)
.ok()
.unwrap_or_default();
let mut files = files.into_iter().map(|x| {
let mut file_hashes = HashMap::new();
for hash in &hashes {
if hash.file_id == x.id {
file_hashes.insert(
hash.algorithm.clone(),
hash.hash.clone(),
);
}
}
QueryFile {
id: x.id,
url: x.url,
filename: x.filename,
hashes: file_hashes,
primary: x.primary,
size: x.size,
file_type: x.file_type,
}
}).collect::<Vec<_>>();
files.sort_by(|a, b| {
if a.primary {
Ordering::Less
} else if b.primary {
Ordering::Greater
} else {
a.filename.cmp(&b.filename)
}
});
files
},
game_versions: {
#[derive(Deserialize)]
struct GameVersion {
pub version: String,
pub created: DateTime<Utc>,
}
let mut game_versions: Vec<GameVersion> = serde_json::from_value(
v.game_versions.unwrap_or_default(),
)
.ok()
.unwrap_or_default();
game_versions.sort_by(|a, b| a.created.cmp(&b.created));
game_versions.into_iter().map(|x| x.version).collect()
},
loaders: v.loaders.unwrap_or_default(),
dependencies: serde_json::from_value(
v.dependencies.unwrap_or_default(),
)
.ok()
.unwrap_or_default(),
}
))
})
.try_collect::<Vec<QueryVersion>>()
Self::get_files_from_hash(algo, &[hash], executor, redis)
.await
.map(|x| {
x.into_iter()
.find_or_first(|x| Some(x.version_id) == version_id)
})
}
pub async fn get_files_from_hash<'a, 'b, E>(
algorithm: String,
hashes: &[String],
executor: E,
redis: &deadpool_redis::Pool,
) -> Result<Vec<SingleFile>, DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
{
if hashes.is_empty() {
return Ok(Vec::new());
}
use futures::stream::TryStreamExt;
let mut file_ids_parsed = hashes.to_vec();
let mut redis = redis.get().await?;
let mut found_files = Vec::new();
let files = cmd("MGET")
.arg(
file_ids_parsed
.iter()
.map(|hash| format!("{}:{}_{}", VERSION_FILES_NAMESPACE, algorithm, hash))
.collect::<Vec<_>>(),
)
.query_async::<_, Vec<Option<String>>>(&mut redis)
.await?;
for file in files {
if let Some(mut file) =
file.and_then(|x| serde_json::from_str::<Vec<SingleFile>>(&x).ok())
{
file_ids_parsed.retain(|x| {
!file
.iter()
.any(|y| y.hashes.iter().any(|z| z.0 == &algorithm && z.1 == x))
});
found_files.append(&mut file);
continue;
}
}
if !file_ids_parsed.is_empty() {
let db_files: Vec<SingleFile> = sqlx::query!(
"
SELECT f.id, f.version_id, v.mod_id, f.url, f.filename, f.is_primary, f.size, f.file_type,
JSONB_AGG(DISTINCT jsonb_build_object('algorithm', h.algorithm, 'hash', encode(h.hash, 'escape'))) filter (where h.hash is not null) hashes
FROM files f
INNER JOIN versions v on v.id = f.version_id
INNER JOIN hashes h on h.file_id = f.id
WHERE h.algorithm = $1 AND h.hash = ANY($2)
GROUP BY f.id, v.mod_id, v.date_published
ORDER BY v.date_published
",
algorithm,
&file_ids_parsed.into_iter().map(|x| x.as_bytes().to_vec()).collect::<Vec<_>>(),
)
.fetch_many(executor)
.try_filter_map(|e| async {
Ok(e.right().map(|f| {
#[derive(Deserialize)]
struct Hash {
pub algorithm: String,
pub hash: String,
}
SingleFile {
id: FileId(f.id),
version_id: VersionId(f.version_id),
project_id: ProjectId(f.mod_id),
url: f.url,
filename: f.filename,
hashes: serde_json::from_value::<Vec<Hash>>(
f.hashes.unwrap_or_default(),
)
.ok()
.unwrap_or_default().into_iter().map(|x| (x.algorithm, x.hash)).collect(),
primary: f.is_primary,
size: f.size as u32,
file_type: f.file_type.map(|x| FileType::from_str(&x)),
}
}
))
})
.try_collect::<Vec<SingleFile>>()
.await?;
let mut save_files: HashMap<String, Vec<SingleFile>> = HashMap::new();
for file in db_files {
for (algo, hash) in &file.hashes {
let key = format!("{}_{}", algo, hash);
if let Some(files) = save_files.get_mut(&key) {
files.push(file.clone());
} else {
save_files.insert(key, vec![file.clone()]);
}
}
}
for (key, mut files) in save_files {
cmd("SET")
.arg(format!("{}:{}", VERSIONS_NAMESPACE, key))
.arg(serde_json::to_string(&files)?)
.arg("EX")
.arg(DEFAULT_EXPIRY)
.query_async::<_, ()>(&mut redis)
.await?;
found_files.append(&mut files);
}
}
Ok(found_files)
}
pub async fn clear_cache(
version: &QueryVersion,
redis: &deadpool_redis::Pool,
) -> Result<(), DatabaseError> {
let mut redis = redis.get().await?;
let mut cmd = cmd("DEL");
cmd.arg(format!("{}:{}", VERSIONS_NAMESPACE, version.inner.id.0));
for file in &version.files {
for (algo, hash) in &file.hashes {
cmd.arg(format!("{}:{}_{}", VERSION_FILES_NAMESPACE, algo, hash));
}
}
cmd.query_async::<_, ()>(&mut redis).await?;
Ok(())
}
// TODO: Needs to be cached
pub async fn get_full_from_id_slug<'a, 'b, E>(
project_id_or_slug: &str,
slug: &str,
executor: E,
) -> Result<Option<QueryVersion>, sqlx::error::Error>
redis: &deadpool_redis::Pool,
) -> Result<Option<QueryVersion>, DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
{
@@ -665,14 +778,14 @@ impl Version {
.await?;
if let Some(version_id) = id {
Version::get_full(VersionId(version_id.id), executor).await
Ok(Version::get(VersionId(version_id.id), executor, redis).await?)
} else {
Ok(None)
}
}
}
#[derive(Clone)]
#[derive(Clone, Deserialize, Serialize)]
pub struct QueryVersion {
pub inner: Version,
@@ -682,7 +795,7 @@ pub struct QueryVersion {
pub dependencies: Vec<QueryDependency>,
}
#[derive(Clone, Deserialize)]
#[derive(Clone, Deserialize, Serialize)]
pub struct QueryDependency {
pub project_id: Option<ProjectId>,
pub version_id: Option<VersionId>,
@@ -690,7 +803,7 @@ pub struct QueryDependency {
pub dependency_type: String,
}
#[derive(Clone)]
#[derive(Clone, Deserialize, Serialize)]
pub struct QueryFile {
pub id: FileId,
pub url: String,
@@ -700,3 +813,16 @@ pub struct QueryFile {
pub size: u32,
pub file_type: Option<FileType>,
}
#[derive(Clone, Deserialize, Serialize)]
pub struct SingleFile {
pub id: FileId,
pub version_id: VersionId,
pub project_id: ProjectId,
pub url: String,
pub filename: String,
pub hashes: HashMap<String, String>,
pub primary: bool,
pub size: u32,
pub file_type: Option<FileType>,
}