Files
AstralRinth/apps/labrinth/src/sync/friends.rs
T
aecsocket e57c15b3ce Add SQLx operation tracing (#5223)
* wip: vendor sqlx-tracing

* (compiles) standardize pg types used

* more standardization

* general log message improvements

* wip: improve sqlx-tracing architecture

* unify sqlx::Executor type

* wip: try fix sqlx tracing

* wip: sqlx-tracing compiles

* so close

* it compiles

* fix ci
2026-01-28 13:38:57 +00:00

88 lines
2.7 KiB
Rust

use crate::database::PgPool;
use crate::queue::socket::ActiveSockets;
use crate::routes::internal::statuses::{
broadcast_to_local_friends, send_message_to_user,
};
use actix_web::web::Data;
use ariadne::ids::UserId;
use ariadne::networking::message::ServerToClientMessage;
use ariadne::users::UserStatus;
use redis::aio::PubSub;
use redis::{RedisWrite, ToRedisArgs};
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;
pub const FRIENDS_CHANNEL_NAME: &str = "friends";
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum RedisFriendsMessage {
StatusUpdate { status: UserStatus },
UserOffline { user: UserId },
DirectStatusUpdate { to_user: UserId, status: UserStatus },
}
impl ToRedisArgs for RedisFriendsMessage {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
out.write_arg(&serde_json::to_vec(&self).unwrap())
}
}
pub async fn handle_pubsub(
mut pubsub: PubSub,
pool: PgPool,
sockets: Data<ActiveSockets>,
) {
pubsub.subscribe(FRIENDS_CHANNEL_NAME).await.unwrap();
let mut stream = pubsub.into_on_message();
while let Some(message) = stream.next().await {
if message.get_channel_name() != FRIENDS_CHANNEL_NAME {
continue;
}
let payload = serde_json::from_slice(message.get_payload_bytes());
let pool = pool.clone();
let sockets = sockets.clone();
actix_rt::spawn(async move {
match payload {
Ok(RedisFriendsMessage::StatusUpdate { status }) => {
let _ = broadcast_to_local_friends(
status.user_id,
ServerToClientMessage::StatusUpdate { status },
&pool,
&sockets,
)
.await;
}
Ok(RedisFriendsMessage::UserOffline { user }) => {
let _ = broadcast_to_local_friends(
user,
ServerToClientMessage::UserOffline { id: user },
&pool,
&sockets,
)
.await;
}
Ok(RedisFriendsMessage::DirectStatusUpdate {
to_user,
status,
}) => {
let _ = send_message_to_user(
&sockets,
to_user,
&ServerToClientMessage::StatusUpdate { status },
)
.await;
}
Err(_) => {}
}
});
}
}