Allow multiple labrinth instances (#3360)

* Move a lot of scheduled tasks to be runnable from the command-line

* Use pubsub to handle sockets connected to multiple Labrinths

* Clippy fix

* Fix build and merge some stuff

* Fix build fmt
:

---------

Signed-off-by: Jai Agrawal <18202329+Geometrically@users.noreply.github.com>
Co-authored-by: Jai A <jaiagr+gpg@pm.me>
Co-authored-by: Jai Agrawal <18202329+Geometrically@users.noreply.github.com>
This commit is contained in:
Josiah Glosson
2025-03-15 09:28:20 -05:00
committed by GitHub
parent 84a9438a70
commit c998d2566e
21 changed files with 1056 additions and 692 deletions

View File

@@ -9,6 +9,10 @@ use crate::queue::socket::{
ActiveSocket, ActiveSockets, SocketId, TunnelSocketType,
};
use crate::routes::ApiError;
use crate::sync::friends::{RedisFriendsMessage, FRIENDS_CHANNEL_NAME};
use crate::sync::status::{
get_user_status, push_back_user_expiry, replace_user_status,
};
use actix_web::web::{Data, Payload};
use actix_web::{get, web, HttpRequest, HttpResponse};
use actix_ws::Message;
@@ -19,10 +23,15 @@ use ariadne::networking::message::{
use ariadne::users::UserStatus;
use chrono::Utc;
use either::Either;
use futures_util::future::select;
use futures_util::{StreamExt, TryStreamExt};
use redis::AsyncCommands;
use serde::Deserialize;
use sqlx::PgPool;
use std::pin::pin;
use std::sync::atomic::Ordering;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::time::{sleep, Duration};
pub fn config(cfg: &mut web::ServiceConfig) {
cfg.service(ws_init);
@@ -62,6 +71,7 @@ pub async fn ws_init(
}
let user = User::from_full(db_user);
let user_id = user.id;
let (res, mut session, msg_stream) = match actix_ws::handle(&req, body) {
Ok(x) => x,
@@ -79,19 +89,32 @@ pub async fn ws_init(
.await?;
let friend_statuses = if !friends.is_empty() {
friends
.iter()
.filter_map(|x| {
db.get_status(
if x.user_id == user.id.into() {
x.friend_id
} else {
x.user_id
let db = db.clone();
let redis = redis.clone();
tokio_stream::iter(friends.iter())
.map(|x| {
let db = db.clone();
let redis = redis.clone();
async move {
async move {
get_user_status(
if x.user_id == user_id.into() {
x.friend_id
} else {
x.user_id
}
.into(),
&db,
&redis,
)
.await
}
.into(),
)
}
})
.buffer_unordered(16)
.filter_map(|x| x)
.collect::<Vec<_>>()
.await
} else {
Vec::new()
};
@@ -116,20 +139,42 @@ pub async fn ws_init(
#[cfg(debug_assertions)]
tracing::info!("Connection {socket_id} opened by {}", user.id);
broadcast_friends(
user.id,
ServerToClientMessage::StatusUpdate { status },
&pool,
&db,
Some(friends),
replace_user_status(None, Some(&status), &redis).await?;
broadcast_friends_message(
&redis,
RedisFriendsMessage::StatusUpdate { status },
)
.await?;
let (shutdown_sender, mut shutdown_receiver) =
tokio::sync::oneshot::channel::<()>();
{
let db = db.clone();
let redis = redis.clone();
actix_web::rt::spawn(async move {
while shutdown_receiver.try_recv() == Err(TryRecvError::Empty) {
sleep(Duration::from_secs(30)).await;
if let Some(socket) = db.sockets.get(&socket_id) {
let _ = socket.socket.clone().ping(&[]).await;
}
let _ = push_back_user_expiry(user_id, &redis).await;
}
});
}
let mut stream = msg_stream.into_stream();
actix_web::rt::spawn(async move {
// receive messages from websocket
while let Some(msg) = stream.next().await {
loop {
let next = pin!(stream.next());
let timeout = pin!(sleep(Duration::from_secs(30)));
let futures_util::future::Either::Left((Some(msg), _)) =
select(next, timeout).await
else {
break;
};
let message = match msg {
Ok(Message::Text(text)) => {
ClientToServerMessage::deserialize(Either::Left(&text))
@@ -139,10 +184,7 @@ pub async fn ws_init(
ClientToServerMessage::deserialize(Either::Right(&bytes))
}
Ok(Message::Close(_)) => {
let _ = close_socket(socket_id, &pool, &db).await;
continue;
}
Ok(Message::Close(_)) => break,
Ok(Message::Ping(msg)) => {
if let Some(socket) = db.sockets.get(&socket_id) {
@@ -162,8 +204,7 @@ pub async fn ws_init(
#[cfg(debug_assertions)]
if !message.is_binary() {
tracing::info!(
"Received message from {socket_id}: {:?}",
message
"Received message from {socket_id}: {message:?}"
);
}
@@ -172,6 +213,8 @@ pub async fn ws_init(
if let Some(mut pair) = db.sockets.get_mut(&socket_id) {
let ActiveSocket { status, .. } = pair.value_mut();
let old_status = status.clone();
if status
.profile_name
.as_ref()
@@ -188,14 +231,17 @@ pub async fn ws_init(
// We drop the pair to avoid holding the lock for too long
drop(pair);
let _ = broadcast_friends(
user.id,
ServerToClientMessage::StatusUpdate {
let _ = replace_user_status(
Some(&old_status),
Some(&user_status),
&redis,
)
.await;
let _ = broadcast_friends_message(
&redis,
RedisFriendsMessage::StatusUpdate {
status: user_status,
},
&pool,
&db,
None,
)
.await;
}
@@ -247,12 +293,11 @@ pub async fn ws_init(
};
match tunnel_socket.socket_type {
TunnelSocketType::Listening => {
let _ = broadcast_friends(
let _ = broadcast_to_local_friends(
user.id,
ServerToClientMessage::FriendSocketStoppedListening { user: user.id },
&pool,
&db,
None,
)
.await;
}
@@ -308,25 +353,48 @@ pub async fn ws_init(
}
}
let _ = close_socket(socket_id, &pool, &db).await;
let _ = shutdown_sender.send(());
let _ = close_socket(socket_id, &pool, &db, &redis).await;
});
Ok(res)
}
pub async fn broadcast_friends(
pub async fn broadcast_friends_message(
redis: &RedisPool,
message: RedisFriendsMessage,
) -> Result<(), crate::database::models::DatabaseError> {
let _: () = redis
.pool
.get()
.await?
.publish(FRIENDS_CHANNEL_NAME, message)
.await?;
Ok(())
}
pub async fn broadcast_to_local_friends(
user_id: UserId,
message: ServerToClientMessage,
pool: &PgPool,
sockets: &ActiveSockets,
friends: Option<Vec<FriendItem>>,
) -> Result<(), crate::database::models::DatabaseError> {
broadcast_to_known_local_friends(
user_id,
message,
sockets,
FriendItem::get_user_friends(user_id.into(), Some(true), pool).await?,
)
.await
}
async fn broadcast_to_known_local_friends(
user_id: UserId,
message: ServerToClientMessage,
sockets: &ActiveSockets,
friends: Vec<FriendItem>,
) -> Result<(), crate::database::models::DatabaseError> {
// FIXME Probably shouldn't be using database errors for this. Maybe ApiError?
let friends = if let Some(friends) = friends {
friends
} else {
FriendItem::get_user_friends(user_id.into(), Some(true), pool).await?
};
for friend in friends {
let friend_id = if friend.user_id == user_id.into() {
@@ -387,6 +455,7 @@ pub async fn close_socket(
id: SocketId,
pool: &PgPool,
db: &ActiveSockets,
redis: &RedisPool,
) -> Result<(), crate::database::models::DatabaseError> {
if let Some((_, socket)) = db.sockets.remove(&id) {
let user_id = socket.status.user_id;
@@ -397,12 +466,10 @@ pub async fn close_socket(
let _ = socket.socket.close(None).await;
broadcast_friends(
user_id,
ServerToClientMessage::UserOffline { id: user_id },
pool,
db,
None,
replace_user_status(Some(&socket.status), None, redis).await?;
broadcast_friends_message(
redis,
RedisFriendsMessage::UserOffline { user: user_id },
)
.await?;
@@ -414,14 +481,13 @@ pub async fn close_socket(
};
match tunnel_socket.socket_type {
TunnelSocketType::Listening => {
let _ = broadcast_friends(
let _ = broadcast_to_local_friends(
user_id,
ServerToClientMessage::SocketClosed {
socket: owned_socket,
},
pool,
db,
None,
)
.await;
}