Friends system for app (#2958)

* Friends system for app

* Fix impl issues

* move friends to in-memory store
This commit is contained in:
Geometrically
2024-11-26 18:23:29 -07:00
committed by GitHub
parent 7184c5f5c7
commit 47b0ccdf78
46 changed files with 1078 additions and 539 deletions

View File

@@ -18,7 +18,7 @@ const FLOWS_NAMESPACE: &str = "flows";
pub enum Flow {
OAuth {
user_id: Option<UserId>,
url: Option<String>,
url: String,
provider: AuthProvider,
},
Login2FA {

View File

@@ -0,0 +1,132 @@
use crate::database::models::UserId;
use chrono::{DateTime, Utc};
pub struct FriendItem {
pub user_id: UserId,
pub friend_id: UserId,
pub created: DateTime<Utc>,
pub accepted: bool,
}
impl FriendItem {
pub async fn insert(
&self,
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<(), sqlx::Error> {
sqlx::query!(
"
INSERT INTO friends (user_id, friend_id, created, accepted)
VALUES ($1, $2, $3, $4)
",
self.user_id.0,
self.friend_id.0,
self.created,
self.accepted,
)
.execute(&mut **transaction)
.await?;
Ok(())
}
pub async fn get_friend<'a, E>(
user_id: UserId,
friend_id: UserId,
exec: E,
) -> Result<Option<FriendItem>, sqlx::Error>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
let friend = sqlx::query!(
"
SELECT f.user_id, f.friend_id, f.created, f.accepted
FROM friends f
WHERE (f.user_id = $1 AND f.friend_id = $2) OR (f.user_id = $2 AND f.friend_id = $1)
",
user_id.0,
friend_id.0,
)
.fetch_optional(exec)
.await?
.map(|row| FriendItem {
user_id: UserId(row.user_id),
friend_id: UserId(row.friend_id),
created: row.created,
accepted: row.accepted,
});
Ok(friend)
}
pub async fn update_friend(
user_id: UserId,
friend_id: UserId,
accepted: bool,
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<(), sqlx::Error> {
sqlx::query!(
"
UPDATE friends
SET accepted = $3
WHERE (user_id = $1 AND friend_id = $2) OR (user_id = $2 AND friend_id = $1)
",
user_id.0,
friend_id.0,
accepted,
)
.execute(&mut **transaction)
.await?;
Ok(())
}
pub async fn get_user_friends<'a, E>(
user_id: UserId,
accepted: Option<bool>,
exec: E,
) -> Result<Vec<FriendItem>, sqlx::Error>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
let friends = sqlx::query!(
"
SELECT f.user_id, f.friend_id, f.created, f.accepted
FROM friends f
WHERE f.user_id = $1 OR f.friend_id = $1
",
user_id.0,
)
.fetch_all(exec)
.await?
.into_iter()
.map(|row| FriendItem {
user_id: UserId(row.user_id),
friend_id: UserId(row.friend_id),
created: row.created,
accepted: row.accepted,
})
.filter(|x| accepted.map(|y| y == x.accepted).unwrap_or(true))
.collect::<Vec<_>>();
Ok(friends)
}
pub async fn remove(
user_id: UserId,
friend_id: UserId,
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<(), sqlx::Error> {
sqlx::query!(
"
DELETE FROM friends
WHERE (user_id = $1 AND friend_id = $2) OR (user_id = $2 AND friend_id = $1)
",
user_id.0 as i64,
friend_id.0 as i64,
)
.execute(&mut **transaction)
.await?;
Ok(())
}
}

View File

@@ -4,6 +4,7 @@ pub mod categories;
pub mod charge_item;
pub mod collection_item;
pub mod flow_item;
pub mod friend_item;
pub mod ids;
pub mod image_item;
pub mod legacy_loader_fields;

View File

@@ -44,6 +44,8 @@ pub struct User {
pub created: DateTime<Utc>,
pub role: String,
pub badges: Badges,
pub allow_friend_requests: bool,
}
impl User {
@@ -58,13 +60,13 @@ impl User {
avatar_url, raw_avatar_url, bio, created,
github_id, discord_id, gitlab_id, google_id, steam_id, microsoft_id,
email_verified, password, paypal_id, paypal_country, paypal_email,
venmo_handle, stripe_customer_id
venmo_handle, stripe_customer_id, allow_friend_requests
)
VALUES (
$1, $2, $3, $4, $5,
$6, $7,
$8, $9, $10, $11, $12, $13,
$14, $15, $16, $17, $18, $19, $20
$14, $15, $16, $17, $18, $19, $20, $21
)
",
self.id as UserId,
@@ -86,7 +88,8 @@ impl User {
self.paypal_country,
self.paypal_email,
self.venmo_handle,
self.stripe_customer_id
self.stripe_customer_id,
self.allow_friend_requests,
)
.execute(&mut **transaction)
.await?;
@@ -172,7 +175,7 @@ impl User {
created, role, badges,
github_id, discord_id, gitlab_id, google_id, steam_id, microsoft_id,
email_verified, password, totp_secret, paypal_id, paypal_country, paypal_email,
venmo_handle, stripe_customer_id
venmo_handle, stripe_customer_id, allow_friend_requests
FROM users
WHERE id = ANY($1) OR LOWER(username) = ANY($2)
",
@@ -205,6 +208,7 @@ impl User {
venmo_handle: u.venmo_handle,
stripe_customer_id: u.stripe_customer_id,
totp_secret: u.totp_secret,
allow_friend_requests: u.allow_friend_requests,
};
acc.insert(u.id, (Some(u.username), user));
@@ -643,6 +647,16 @@ impl User {
.execute(&mut **transaction)
.await?;
sqlx::query!(
"
DELETE FROM friends
WHERE user_id = $1 OR friend_id = $1
",
id as UserId,
)
.execute(&mut **transaction)
.await?;
sqlx::query!(
"
DELETE FROM user_backup_codes

View File

@@ -547,6 +547,23 @@ impl RedisConnection {
Ok(res)
}
pub async fn get_many(
&mut self,
namespace: &str,
ids: &[String],
) -> Result<Vec<Option<String>>, DatabaseError> {
let mut cmd = cmd("MGET");
redis_args(
&mut cmd,
ids.iter()
.map(|x| format!("{}_{}:{}", self.meta_namespace, namespace, x))
.collect::<Vec<_>>()
.as_slice(),
);
let res = redis_execute(&mut cmd, &mut self.connection).await?;
Ok(res)
}
pub async fn get_deserialized_from_json<R>(
&mut self,
namespace: &str,
@@ -561,6 +578,22 @@ impl RedisConnection {
.and_then(|x| serde_json::from_str(&x).ok()))
}
pub async fn get_many_deserialized_from_json<R>(
&mut self,
namespace: &str,
ids: &[String],
) -> Result<Vec<Option<R>>, DatabaseError>
where
R: for<'a> serde::Deserialize<'a>,
{
Ok(self
.get_many(namespace, ids)
.await?
.into_iter()
.map(|x| x.and_then(|val| serde_json::from_str::<R>(&val).ok()))
.collect::<Vec<_>>())
}
pub async fn delete<T1>(
&mut self,
namespace: &str,

View File

@@ -10,7 +10,6 @@ use queue::{
socket::ActiveSockets,
};
use sqlx::Postgres;
use tokio::sync::RwLock;
extern crate clickhouse as clickhouse_crate;
use clickhouse_crate::Client;
@@ -56,7 +55,7 @@ pub struct LabrinthConfig {
pub session_queue: web::Data<AuthQueue>,
pub payouts_queue: web::Data<PayoutsQueue>,
pub analytics_queue: Arc<AnalyticsQueue>,
pub active_sockets: web::Data<RwLock<ActiveSockets>>,
pub active_sockets: web::Data<ActiveSockets>,
pub automated_moderation_queue: web::Data<AutomatedModerationQueue>,
pub rate_limiter: KeyedRateLimiter,
pub stripe_client: stripe::Client,
@@ -303,7 +302,7 @@ pub fn app_setup(
};
let payouts_queue = web::Data::new(PayoutsQueue::new());
let active_sockets = web::Data::new(RwLock::new(ActiveSockets::default()));
let active_sockets = web::Data::new(ActiveSockets::default());
LabrinthConfig {
pool,

View File

@@ -52,6 +52,7 @@ pub struct User {
pub has_totp: Option<bool>,
pub payout_data: Option<UserPayoutData>,
pub stripe_customer_id: Option<String>,
pub allow_friend_requests: Option<bool>,
// DEPRECATED. Always returns None
pub github_id: Option<u64>,
@@ -85,6 +86,7 @@ impl From<DBUser> for User {
has_totp: None,
github_id: None,
stripe_customer_id: None,
allow_friend_requests: None,
}
}
}
@@ -136,6 +138,7 @@ impl User {
balance: Decimal::ZERO,
}),
stripe_customer_id: db_user.stripe_customer_id,
allow_friend_requests: Some(db_user.allow_friend_requests),
}
}
}
@@ -185,3 +188,29 @@ impl Role {
}
}
}
#[derive(Serialize, Deserialize)]
pub struct UserFriend {
pub id: UserId,
pub pending: bool,
pub created: DateTime<Utc>,
}
impl UserFriend {
pub fn from(
data: crate::database::models::friend_item::FriendItem,
) -> Self {
Self {
id: data.friend_id.into(),
pending: data.accepted,
created: data.created,
}
}
}
#[derive(Serialize, Deserialize, Clone)]
pub struct UserStatus {
pub user_id: UserId,
pub profile_name: Option<String>,
pub last_update: DateTime<Utc>,
}

View File

@@ -1,9 +1,10 @@
//! "Database" for Hydra
use crate::models::users::{UserId, UserStatus};
use actix_ws::Session;
use dashmap::DashMap;
pub struct ActiveSockets {
pub auth_sockets: DashMap<String, Session>,
pub auth_sockets: DashMap<UserId, (UserStatus, Session)>,
}
impl Default for ActiveSockets {

View File

@@ -9,7 +9,6 @@ use crate::models::ids::random_base62_rng;
use crate::models::pats::Scopes;
use crate::models::users::{Badges, Role};
use crate::queue::session::AuthQueue;
use crate::queue::socket::ActiveSockets;
use crate::routes::internal::session::issue_session;
use crate::routes::ApiError;
use crate::util::captcha::check_hcaptcha;
@@ -17,9 +16,8 @@ use crate::util::env::parse_strings_from_var;
use crate::util::ext::get_image_ext;
use crate::util::img::upload_image_optimized;
use crate::util::validate::{validation_errors_to_string, RE_URL_SAFE};
use actix_web::web::{scope, Data, Payload, Query, ServiceConfig};
use actix_web::web::{scope, Data, Query, ServiceConfig};
use actix_web::{delete, get, patch, post, web, HttpRequest, HttpResponse};
use actix_ws::Closed;
use argon2::password_hash::SaltString;
use argon2::{Argon2, PasswordHash, PasswordHasher, PasswordVerifier};
use base64::Engine;
@@ -32,13 +30,11 @@ use sqlx::postgres::PgPool;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::RwLock;
use validator::Validate;
pub fn config(cfg: &mut ServiceConfig) {
cfg.service(
scope("auth")
.service(ws_init)
.service(init)
.service(auth_callback)
.service(delete_auth_provider)
@@ -233,6 +229,7 @@ impl TempUser {
created: Utc::now(),
role: Role::Developer.to_string(),
badges: Badges::default(),
allow_friend_requests: true,
}
.insert(transaction)
.await?;
@@ -1090,7 +1087,7 @@ pub async fn init(
let state = Flow::OAuth {
user_id,
url: Some(info.url),
url: info.url,
provider: info.provider,
}
.insert(Duration::minutes(30), &redis)
@@ -1102,59 +1099,10 @@ pub async fn init(
.json(serde_json::json!({ "url": url })))
}
#[derive(Serialize, Deserialize)]
pub struct WsInit {
pub provider: AuthProvider,
}
#[get("ws")]
pub async fn ws_init(
req: HttpRequest,
Query(info): Query<WsInit>,
body: Payload,
db: Data<RwLock<ActiveSockets>>,
redis: Data<RedisPool>,
) -> Result<HttpResponse, actix_web::Error> {
let (res, session, _msg_stream) = actix_ws::handle(&req, body)?;
async fn sock(
mut ws_stream: actix_ws::Session,
info: WsInit,
db: Data<RwLock<ActiveSockets>>,
redis: Data<RedisPool>,
) -> Result<(), Closed> {
let flow = Flow::OAuth {
user_id: None,
url: None,
provider: info.provider,
}
.insert(Duration::minutes(30), &redis)
.await;
if let Ok(state) = flow {
if let Ok(url) = info.provider.get_redirect_url(state.clone()) {
ws_stream
.text(serde_json::json!({ "url": url }).to_string())
.await?;
let db = db.write().await;
db.auth_sockets.insert(state, ws_stream);
}
}
Ok(())
}
let _ = sock(session, info, db, redis).await;
Ok(res)
}
#[get("callback")]
pub async fn auth_callback(
req: HttpRequest,
Query(query): Query<HashMap<String, String>>,
active_sockets: Data<RwLock<ActiveSockets>>,
client: Data<PgPool>,
file_host: Data<Arc<dyn FileHost + Send + Sync>>,
redis: Data<RedisPool>,
@@ -1164,10 +1112,8 @@ pub async fn auth_callback(
.ok_or_else(|| AuthenticationError::InvalidCredentials)?
.clone();
let sockets = active_sockets.clone();
let state = state_string.clone();
let res: Result<HttpResponse, AuthenticationError> = async move {
let flow = Flow::get(&state, &redis).await?;
// Extract cookie header from request
@@ -1223,13 +1169,9 @@ pub async fn auth_callback(
transaction.commit().await?;
crate::database::models::User::clear_caches(&[(id, None)], &redis).await?;
if let Some(url) = url {
Ok(HttpResponse::TemporaryRedirect()
.append_header(("Location", &*url))
.json(serde_json::json!({ "url": url })))
} else {
Err(AuthenticationError::InvalidCredentials)
}
Ok(HttpResponse::TemporaryRedirect()
.append_header(("Location", &*url))
.json(serde_json::json!({ "url": url })))
} else {
let user_id = if let Some(user_id) = user_id_opt {
let user = crate::database::models::User::get_id(user_id, &**client, &redis)
@@ -1241,45 +1183,16 @@ pub async fn auth_callback(
.insert(Duration::minutes(30), &redis)
.await?;
if let Some(url) = url {
let redirect_url = format!(
"{}{}error=2fa_required&flow={}",
url,
if url.contains('?') { "&" } else { "?" },
flow
);
let redirect_url = format!(
"{}{}error=2fa_required&flow={}",
url,
if url.contains('?') { "&" } else { "?" },
flow
);
return Ok(HttpResponse::TemporaryRedirect()
.append_header(("Location", &*redirect_url))
.json(serde_json::json!({ "url": redirect_url })));
} else {
let mut ws_conn = {
let db = sockets.read().await;
let mut x = db
.auth_sockets
.get_mut(&state)
.ok_or_else(|| AuthenticationError::SocketError)?;
x.value_mut().clone()
};
ws_conn
.text(
serde_json::json!({
"error": "2fa_required",
"flow": flow,
}).to_string()
)
.await.map_err(|_| AuthenticationError::SocketError)?;
let _ = ws_conn.close(None).await;
return Ok(crate::auth::templates::Success {
icon: user.avatar_url.as_deref().unwrap_or("https://cdn-raw.modrinth.com/placeholder.svg"),
name: &user.username,
}.render());
}
return Ok(HttpResponse::TemporaryRedirect()
.append_header(("Location", &*redirect_url))
.json(serde_json::json!({ "url": redirect_url })));
}
user_id
@@ -1290,83 +1203,27 @@ pub async fn auth_callback(
let session = issue_session(req, user_id, &mut transaction, &redis).await?;
transaction.commit().await?;
if let Some(url) = url {
let redirect_url = format!(
"{}{}code={}{}",
url,
if url.contains('?') { '&' } else { '?' },
session.session,
if user_id_opt.is_none() {
"&new_account=true"
} else {
""
}
);
let redirect_url = format!(
"{}{}code={}{}",
url,
if url.contains('?') { '&' } else { '?' },
session.session,
if user_id_opt.is_none() {
"&new_account=true"
} else {
""
}
);
Ok(HttpResponse::TemporaryRedirect()
.append_header(("Location", &*redirect_url))
.json(serde_json::json!({ "url": redirect_url })))
} else {
let user = crate::database::models::user_item::User::get_id(
user_id,
&**client,
&redis,
)
.await?.ok_or_else(|| AuthenticationError::InvalidCredentials)?;
let mut ws_conn = {
let db = sockets.read().await;
let mut x = db
.auth_sockets
.get_mut(&state)
.ok_or_else(|| AuthenticationError::SocketError)?;
x.value_mut().clone()
};
ws_conn
.text(
serde_json::json!({
"code": session.session,
}).to_string()
)
.await.map_err(|_| AuthenticationError::SocketError)?;
let _ = ws_conn.close(None).await;
return Ok(crate::auth::templates::Success {
icon: user.avatar_url.as_deref().unwrap_or("https://cdn-raw.modrinth.com/placeholder.svg"),
name: &user.username,
}.render());
}
Ok(HttpResponse::TemporaryRedirect()
.append_header(("Location", &*redirect_url))
.json(serde_json::json!({ "url": redirect_url })))
}
} else {
Err::<HttpResponse, AuthenticationError>(AuthenticationError::InvalidCredentials)
}
}.await;
// Because this is callback route, if we have an error, we need to ensure we close the original socket if it exists
if let Err(ref e) = res {
let db = active_sockets.read().await;
let mut x = db.auth_sockets.get_mut(&state_string);
if let Some(x) = x.as_mut() {
let mut ws_conn = x.value_mut().clone();
ws_conn
.text(
serde_json::json!({
"error": &e.error_name(),
"description": &e.to_string(),
} )
.to_string(),
)
.await
.map_err(|_| AuthenticationError::SocketError)?;
let _ = ws_conn.close(None).await;
}
}
Ok(res?)
}
@@ -1557,6 +1414,7 @@ pub async fn create_account_with_password(
created: Utc::now(),
role: Role::Developer.to_string(),
badges: Badges::default(),
allow_friend_requests: true,
}
.insert(&mut transaction)
.await?;

View File

@@ -6,6 +6,8 @@ pub mod moderation;
pub mod pats;
pub mod session;
pub mod statuses;
use super::v3::oauth_clients;
pub use super::ApiError;
use crate::util::cors::default_cors;
@@ -21,6 +23,7 @@ pub fn config(cfg: &mut actix_web::web::ServiceConfig) {
.configure(pats::config)
.configure(moderation::config)
.configure(billing::config)
.configure(gdpr::config),
.configure(gdpr::config)
.configure(statuses::config),
);
}

View File

@@ -0,0 +1,255 @@
use crate::auth::validate::get_user_record_from_bearer_token;
use crate::auth::AuthenticationError;
use crate::database::models::friend_item::FriendItem;
use crate::database::redis::RedisPool;
use crate::models::ids::UserId;
use crate::models::pats::Scopes;
use crate::models::users::{User, UserStatus};
use crate::queue::session::AuthQueue;
use crate::queue::socket::ActiveSockets;
use crate::routes::ApiError;
use actix_web::web::{Data, Payload};
use actix_web::{get, web, HttpRequest, HttpResponse};
use actix_ws::AggregatedMessage;
use chrono::Utc;
use futures_util::StreamExt;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
pub fn config(cfg: &mut web::ServiceConfig) {
cfg.service(ws_init);
}
#[derive(Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ClientToServerMessage {
StatusUpdate { profile_name: Option<String> },
}
#[derive(Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ServerToClientMessage {
StatusUpdate { status: UserStatus },
UserOffline { id: UserId },
FriendStatuses { statuses: Vec<UserStatus> },
FriendRequest { from: UserId },
}
#[derive(Deserialize)]
struct LauncherHeartbeatInit {
code: String,
}
#[get("launcher_heartbeat")]
pub async fn ws_init(
req: HttpRequest,
pool: Data<PgPool>,
web::Query(auth): web::Query<LauncherHeartbeatInit>,
body: Payload,
db: Data<ActiveSockets>,
redis: Data<RedisPool>,
session_queue: Data<AuthQueue>,
) -> Result<HttpResponse, ApiError> {
let (scopes, db_user) = get_user_record_from_bearer_token(
&req,
Some(&auth.code),
&**pool,
&redis,
&session_queue,
)
.await?
.ok_or_else(|| {
ApiError::Authentication(AuthenticationError::InvalidCredentials)
})?;
if !scopes.contains(Scopes::SESSION_ACCESS) {
return Err(ApiError::Authentication(
AuthenticationError::InvalidCredentials,
));
}
let user = User::from_full(db_user);
if let Some((_, (_, session))) = db.auth_sockets.remove(&user.id) {
let _ = session.close(None).await;
}
let (res, mut session, msg_stream) = match actix_ws::handle(&req, body) {
Ok(x) => x,
Err(e) => return Ok(e.error_response()),
};
let status = UserStatus {
user_id: user.id,
profile_name: None,
last_update: Utc::now(),
};
let friends =
FriendItem::get_user_friends(user.id.into(), Some(true), &**pool)
.await?;
let friend_statuses = if !friends.is_empty() {
friends
.iter()
.filter_map(|x| {
db.auth_sockets.get(
&if x.user_id == user.id.into() {
x.friend_id
} else {
x.user_id
}
.into(),
)
})
.map(|x| x.value().0.clone())
.collect::<Vec<_>>()
} else {
Vec::new()
};
let _ = session
.text(serde_json::to_string(
&ServerToClientMessage::FriendStatuses {
statuses: friend_statuses,
},
)?)
.await;
db.auth_sockets.insert(user.id, (status.clone(), session));
broadcast_friends(
user.id,
ServerToClientMessage::StatusUpdate { status },
&pool,
&redis,
&db,
Some(friends),
)
.await?;
let mut stream = msg_stream
.aggregate_continuations()
// aggregate continuation frames up to 1MiB
.max_continuation_size(2_usize.pow(20));
actix_web::rt::spawn(async move {
// receive messages from websocket
while let Some(msg) = stream.next().await {
match msg {
Ok(AggregatedMessage::Text(text)) => {
if let Ok(message) =
serde_json::from_str::<ClientToServerMessage>(&text)
{
match message {
ClientToServerMessage::StatusUpdate {
profile_name,
} => {
if let Some(mut pair) =
db.auth_sockets.get_mut(&user.id)
{
let (status, _) = pair.value_mut();
if status.profile_name.as_ref().map(|x| x.len() > 64).unwrap_or(false) {
continue;
}
status.profile_name = profile_name;
status.last_update = Utc::now();
let _ = broadcast_friends(
user.id,
ServerToClientMessage::StatusUpdate {
status: status.clone(),
},
&pool,
&redis,
&db,
None,
)
.await;
}
}
}
}
}
Ok(AggregatedMessage::Close(_)) => {
let _ = close_socket(user.id, &pool, &redis, &db).await;
}
_ => {}
}
}
});
Ok(res)
}
pub async fn broadcast_friends(
user_id: UserId,
message: ServerToClientMessage,
pool: &PgPool,
redis: &RedisPool,
sockets: &ActiveSockets,
friends: Option<Vec<FriendItem>>,
) -> Result<(), crate::database::models::DatabaseError> {
let friends = if let Some(friends) = friends {
friends
} else {
FriendItem::get_user_friends(user_id.into(), Some(true), pool).await?
};
for friend in friends {
let friend_id = if friend.user_id == user_id.into() {
friend.friend_id
} else {
friend.user_id
};
if friend.accepted {
if let Some(mut socket) =
sockets.auth_sockets.get_mut(&friend_id.into())
{
let (_, socket) = socket.value_mut();
// TODO: bulk close sockets for better perf
if socket.text(serde_json::to_string(&message)?).await.is_err()
{
Box::pin(close_socket(
friend_id.into(),
pool,
redis,
sockets,
))
.await?;
}
}
}
}
Ok(())
}
pub async fn close_socket(
id: UserId,
pool: &PgPool,
redis: &RedisPool,
sockets: &ActiveSockets,
) -> Result<(), crate::database::models::DatabaseError> {
if let Some((_, (_, socket))) = sockets.auth_sockets.remove(&id) {
let _ = socket.close(None).await;
}
broadcast_friends(
id,
ServerToClientMessage::UserOffline { id },
pool,
redis,
sockets,
None,
)
.await?;
Ok(())
}

View File

@@ -156,6 +156,7 @@ pub struct EditUser {
pub bio: Option<Option<String>>,
pub role: Option<Role>,
pub badges: Option<Badges>,
pub allow_friend_requests: Option<bool>,
}
#[patch("{id}")]
@@ -178,6 +179,7 @@ pub async fn user_edit(
role: new_user.role,
badges: new_user.badges,
venmo_handle: None,
allow_friend_requests: new_user.allow_friend_requests,
}),
pool,
redis,

View File

@@ -0,0 +1,242 @@
use crate::auth::get_user_from_headers;
use crate::database::models::UserId;
use crate::database::redis::RedisPool;
use crate::models::pats::Scopes;
use crate::models::users::UserFriend;
use crate::queue::session::AuthQueue;
use crate::queue::socket::ActiveSockets;
use crate::routes::internal::statuses::{close_socket, ServerToClientMessage};
use crate::routes::ApiError;
use actix_web::{delete, get, post, web, HttpRequest, HttpResponse};
use chrono::Utc;
use sqlx::PgPool;
pub fn config(cfg: &mut web::ServiceConfig) {
cfg.service(add_friend);
cfg.service(remove_friend);
cfg.service(friends);
}
#[post("friend/{id}")]
pub async fn add_friend(
req: HttpRequest,
info: web::Path<(String,)>,
pool: web::Data<PgPool>,
redis: web::Data<RedisPool>,
session_queue: web::Data<AuthQueue>,
db: web::Data<ActiveSockets>,
) -> Result<HttpResponse, ApiError> {
let user = get_user_from_headers(
&req,
&**pool,
&redis,
&session_queue,
Some(&[Scopes::USER_WRITE]),
)
.await?
.1;
let string = info.into_inner().0;
let friend =
crate::database::models::User::get(&string, &**pool, &redis).await?;
if let Some(friend) = friend {
let mut transaction = pool.begin().await?;
if let Some(friend) =
crate::database::models::friend_item::FriendItem::get_friend(
user.id.into(),
friend.id,
&**pool,
)
.await?
{
if friend.accepted {
return Err(ApiError::InvalidInput(
"You are already friends with this user!".to_string(),
));
}
if !friend.accepted && user.id != friend.friend_id.into() {
return Err(ApiError::InvalidInput(
"You cannot accept your own friend request!".to_string(),
));
}
crate::database::models::friend_item::FriendItem::update_friend(
friend.user_id,
friend.friend_id,
true,
&mut transaction,
)
.await?;
async fn send_friend_status(
user_id: UserId,
friend_id: UserId,
pool: &PgPool,
redis: &RedisPool,
sockets: &ActiveSockets,
) -> Result<(), ApiError> {
if let Some(pair) = sockets.auth_sockets.get(&user_id.into()) {
let (friend_status, _) = pair.value();
if let Some(mut socket) =
sockets.auth_sockets.get_mut(&friend_id.into())
{
let (_, socket) = socket.value_mut();
if socket
.text(serde_json::to_string(
&ServerToClientMessage::StatusUpdate {
status: friend_status.clone(),
},
)?)
.await
.is_err()
{
close_socket(
friend_id.into(),
pool,
redis,
sockets,
)
.await?;
}
}
}
Ok(())
}
send_friend_status(
friend.user_id,
friend.friend_id,
&pool,
&redis,
&db,
)
.await?;
send_friend_status(
friend.friend_id,
friend.user_id,
&pool,
&redis,
&db,
)
.await?;
} else {
if friend.id == user.id.into() {
return Err(ApiError::InvalidInput(
"You cannot add yourself as a friend!".to_string(),
));
}
if !friend.allow_friend_requests {
return Err(ApiError::InvalidInput(
"Friend requests are disabled for this user!".to_string(),
));
}
crate::database::models::friend_item::FriendItem {
user_id: user.id.into(),
friend_id: friend.id,
created: Utc::now(),
accepted: false,
}
.insert(&mut transaction)
.await?;
if let Some(mut socket) = db.auth_sockets.get_mut(&friend.id.into())
{
let (_, socket) = socket.value_mut();
if socket
.text(serde_json::to_string(
&ServerToClientMessage::FriendRequest { from: user.id },
)?)
.await
.is_err()
{
close_socket(user.id, &pool, &redis, &db).await?;
}
}
}
transaction.commit().await?;
Ok(HttpResponse::NoContent().body(""))
} else {
Err(ApiError::NotFound)
}
}
#[delete("friend/{id}")]
pub async fn remove_friend(
req: HttpRequest,
info: web::Path<(String,)>,
pool: web::Data<PgPool>,
redis: web::Data<RedisPool>,
session_queue: web::Data<AuthQueue>,
) -> Result<HttpResponse, ApiError> {
let user = get_user_from_headers(
&req,
&**pool,
&redis,
&session_queue,
Some(&[Scopes::USER_WRITE]),
)
.await?
.1;
let string = info.into_inner().0;
let friend =
crate::database::models::User::get(&string, &**pool, &redis).await?;
if let Some(friend) = friend {
let mut transaction = pool.begin().await?;
crate::database::models::friend_item::FriendItem::remove(
user.id.into(),
friend.id,
&mut transaction,
)
.await?;
transaction.commit().await?;
Ok(HttpResponse::NoContent().body(""))
} else {
Err(ApiError::NotFound)
}
}
#[get("friends")]
pub async fn friends(
req: HttpRequest,
pool: web::Data<PgPool>,
redis: web::Data<RedisPool>,
session_queue: web::Data<AuthQueue>,
) -> Result<HttpResponse, ApiError> {
let user = get_user_from_headers(
&req,
&**pool,
&redis,
&session_queue,
Some(&[Scopes::USER_READ]),
)
.await?
.1;
let friends =
crate::database::models::friend_item::FriendItem::get_user_friends(
user.id.into(),
None,
&**pool,
)
.await?
.into_iter()
.map(UserFriend::from)
.collect::<Vec<_>>();
Ok(HttpResponse::Ok().json(friends))
}

View File

@@ -5,6 +5,7 @@ use serde_json::json;
pub mod analytics_get;
pub mod collections;
pub mod friends;
pub mod images;
pub mod notifications;
pub mod organizations;
@@ -42,7 +43,8 @@ pub fn config(cfg: &mut web::ServiceConfig) {
.configure(users::config)
.configure(version_file::config)
.configure(payouts::config)
.configure(versions::config),
.configure(versions::config)
.configure(friends::config),
);
}

View File

@@ -300,6 +300,7 @@ pub struct EditUser {
pub badges: Option<Badges>,
#[validate(length(max = 160))]
pub venmo_handle: Option<String>,
pub allow_friend_requests: Option<bool>,
}
pub async fn user_edit(
@@ -438,6 +439,20 @@ pub async fn user_edit(
.await?;
}
if let Some(allow_friend_requests) = &user.allow_friend_requests {
sqlx::query!(
"
UPDATE users
SET allow_friend_requests = $1
WHERE (id = $2)
",
allow_friend_requests,
id as crate::database::models::ids::UserId,
)
.execute(&mut *transaction)
.await?;
}
transaction.commit().await?;
User::clear_caches(&[(id, Some(actual_user.username))], &redis)
.await?;