You've already forked AstralRinth
forked from didirus/AstralRinth
* WIP end-of-day push * Authorize endpoint, accept endpoints, DB stuff for oauth clients, their redirects, and client authorizations * OAuth Client create route * Get user clients * Client delete * Edit oauth client * Include redirects in edit client route * Database stuff for tokens * Reorg oauth stuff out of auth/flows and into its own module * Impl OAuth get access token endpoint * Accept oauth access tokens as auth and update through AuthQueue * User OAuth authorization management routes * Forgot to actually add the routes lol * Bit o cleanup * Happy path test for OAuth and minor fixes for things it found * Add dummy data oauth client (and detect/handle dummy data version changes) * More tests * Another test * More tests and reject endpoint * Test oauth client and authorization management routes * cargo sqlx prepare * dead code warning * Auto clippy fixes * Uri refactoring * minor name improvement * Don't compile-time check the test sqlx queries * Trying to fix db concurrency problem to get tests to pass * Try fix from test PR * Fixes for updated sqlx * Prevent restricted scopes from being requested or issued * Get OAuth client(s) * Remove joined oauth client info from authorization returns * Add default conversion to OAuthError::error so we can use ? * Rework routes * Consolidate scopes into SESSION_ACCESS * Cargo sqlx prepare * Parse to OAuthClientId automatically through serde and actix * Cargo clippy * Remove validation requiring 1 redirect URI on oauth client creation * Use serde(flatten) on OAuthClientCreationResult
164 lines
5.3 KiB
Rust
164 lines
5.3 KiB
Rust
use crate::auth::session::SessionMetadata;
|
|
use crate::database::models::pat_item::PersonalAccessToken;
|
|
use crate::database::models::session_item::Session;
|
|
use crate::database::models::{DatabaseError, OAuthAccessTokenId, PatId, SessionId, UserId};
|
|
use crate::database::redis::RedisPool;
|
|
use chrono::Utc;
|
|
use itertools::Itertools;
|
|
use sqlx::PgPool;
|
|
use std::collections::{HashMap, HashSet};
|
|
use tokio::sync::Mutex;
|
|
|
|
pub struct AuthQueue {
|
|
session_queue: Mutex<HashMap<SessionId, SessionMetadata>>,
|
|
pat_queue: Mutex<HashSet<PatId>>,
|
|
oauth_access_token_queue: Mutex<HashSet<OAuthAccessTokenId>>,
|
|
}
|
|
|
|
impl Default for AuthQueue {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
// Batches session accessing transactions every 30 seconds
|
|
impl AuthQueue {
|
|
pub fn new() -> Self {
|
|
AuthQueue {
|
|
session_queue: Mutex::new(HashMap::with_capacity(1000)),
|
|
pat_queue: Mutex::new(HashSet::with_capacity(1000)),
|
|
oauth_access_token_queue: Mutex::new(HashSet::with_capacity(1000)),
|
|
}
|
|
}
|
|
pub async fn add_session(&self, id: SessionId, metadata: SessionMetadata) {
|
|
self.session_queue.lock().await.insert(id, metadata);
|
|
}
|
|
|
|
pub async fn add_pat(&self, id: PatId) {
|
|
self.pat_queue.lock().await.insert(id);
|
|
}
|
|
|
|
pub async fn add_oauth_access_token(&self, id: crate::database::models::OAuthAccessTokenId) {
|
|
self.oauth_access_token_queue.lock().await.insert(id);
|
|
}
|
|
|
|
pub async fn take_sessions(&self) -> HashMap<SessionId, SessionMetadata> {
|
|
let mut queue = self.session_queue.lock().await;
|
|
let len = queue.len();
|
|
|
|
std::mem::replace(&mut queue, HashMap::with_capacity(len))
|
|
}
|
|
|
|
pub async fn take_hashset<T>(queue: &Mutex<HashSet<T>>) -> HashSet<T> {
|
|
let mut queue = queue.lock().await;
|
|
let len = queue.len();
|
|
|
|
std::mem::replace(&mut queue, HashSet::with_capacity(len))
|
|
}
|
|
|
|
pub async fn index(&self, pool: &PgPool, redis: &RedisPool) -> Result<(), DatabaseError> {
|
|
let session_queue = self.take_sessions().await;
|
|
let pat_queue = Self::take_hashset(&self.pat_queue).await;
|
|
let oauth_access_token_queue = Self::take_hashset(&self.oauth_access_token_queue).await;
|
|
|
|
if !session_queue.is_empty()
|
|
|| !pat_queue.is_empty()
|
|
|| !oauth_access_token_queue.is_empty()
|
|
{
|
|
let mut transaction = pool.begin().await?;
|
|
let mut clear_cache_sessions = Vec::new();
|
|
|
|
for (id, metadata) in session_queue {
|
|
clear_cache_sessions.push((Some(id), None, None));
|
|
|
|
sqlx::query!(
|
|
"
|
|
UPDATE sessions
|
|
SET last_login = $2, city = $3, country = $4, ip = $5, os = $6, platform = $7, user_agent = $8
|
|
WHERE (id = $1)
|
|
",
|
|
id as SessionId,
|
|
Utc::now(),
|
|
metadata.city,
|
|
metadata.country,
|
|
metadata.ip,
|
|
metadata.os,
|
|
metadata.platform,
|
|
metadata.user_agent,
|
|
)
|
|
.execute(&mut *transaction)
|
|
.await?;
|
|
}
|
|
|
|
use futures::TryStreamExt;
|
|
let expired_ids = sqlx::query!(
|
|
"
|
|
SELECT id, session, user_id
|
|
FROM sessions
|
|
WHERE refresh_expires <= NOW()
|
|
"
|
|
)
|
|
.fetch_many(&mut *transaction)
|
|
.try_filter_map(|e| async {
|
|
Ok(e.right()
|
|
.map(|x| (SessionId(x.id), x.session, UserId(x.user_id))))
|
|
})
|
|
.try_collect::<Vec<(SessionId, String, UserId)>>()
|
|
.await?;
|
|
|
|
for (id, session, user_id) in expired_ids {
|
|
clear_cache_sessions.push((Some(id), Some(session), Some(user_id)));
|
|
Session::remove(id, &mut transaction).await?;
|
|
}
|
|
|
|
Session::clear_cache(clear_cache_sessions, redis).await?;
|
|
|
|
let ids = pat_queue.iter().map(|id| id.0).collect_vec();
|
|
let clear_cache_pats = pat_queue
|
|
.into_iter()
|
|
.map(|id| (Some(id), None, None))
|
|
.collect_vec();
|
|
sqlx::query!(
|
|
"
|
|
UPDATE pats
|
|
SET last_used = $2
|
|
WHERE id IN
|
|
(SELECT * FROM UNNEST($1::bigint[]))
|
|
",
|
|
&ids[..],
|
|
Utc::now(),
|
|
)
|
|
.execute(&mut *transaction)
|
|
.await?;
|
|
|
|
PersonalAccessToken::clear_cache(clear_cache_pats, redis).await?;
|
|
|
|
update_oauth_access_token_last_used(oauth_access_token_queue, &mut transaction).await?;
|
|
|
|
transaction.commit().await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
async fn update_oauth_access_token_last_used(
|
|
oauth_access_token_queue: HashSet<OAuthAccessTokenId>,
|
|
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
|
|
) -> Result<(), DatabaseError> {
|
|
let ids = oauth_access_token_queue.iter().map(|id| id.0).collect_vec();
|
|
sqlx::query!(
|
|
"
|
|
UPDATE oauth_access_tokens
|
|
SET last_used = $2
|
|
WHERE id IN
|
|
(SELECT * FROM UNNEST($1::bigint[]))
|
|
",
|
|
&ids[..],
|
|
Utc::now()
|
|
)
|
|
.execute(&mut **transaction)
|
|
.await?;
|
|
Ok(())
|
|
}
|