From 1d193ed01b3b66b0dfbe3ace7f080fd1eb60310d Mon Sep 17 00:00:00 2001 From: aecsocket Date: Thu, 22 Jan 2026 16:46:37 +0000 Subject: [PATCH] More tracing spans for Labrinth Redis (#5182) * more tracing in redis ops * Improve Redis tracing * improve messages * make lpush and brpop use traced cmds --- Cargo.lock | 1 + apps/labrinth/Cargo.toml | 1 + .../src/database/{redis.rs => redis/mod.rs} | 147 ++++++++++-------- apps/labrinth/src/database/redis/util.rs | 93 +++++++++++ 4 files changed, 178 insertions(+), 64 deletions(-) rename apps/labrinth/src/database/{redis.rs => redis/mod.rs} (87%) create mode 100644 apps/labrinth/src/database/redis/util.rs diff --git a/Cargo.lock b/Cargo.lock index 3be3329d..888f952a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4523,6 +4523,7 @@ dependencies = [ "const_format", "dashmap", "deadpool-redis", + "derive_more 2.0.1", "dotenv-build", "dotenvy", "either", diff --git a/apps/labrinth/Cargo.toml b/apps/labrinth/Cargo.toml index cc0a3dc7..aa6bdb0b 100644 --- a/apps/labrinth/Cargo.toml +++ b/apps/labrinth/Cargo.toml @@ -27,6 +27,7 @@ async-stripe = { workspace = true, features = [ "connect", "webhook-events", ] } +derive_more = { workspace = true, features = ["deref", "deref_mut"]} async-trait = { workspace = true } base64 = { workspace = true } bitflags = { workspace = true } diff --git a/apps/labrinth/src/database/redis.rs b/apps/labrinth/src/database/redis/mod.rs similarity index 87% rename from apps/labrinth/src/database/redis.rs rename to apps/labrinth/src/database/redis/mod.rs index 02f0fe59..dd4e11e1 100644 --- a/apps/labrinth/src/database/redis.rs +++ b/apps/labrinth/src/database/redis/mod.rs @@ -5,10 +5,7 @@ use dashmap::DashMap; use deadpool_redis::{Config, Runtime}; use futures::future::Either; use prometheus::{IntGauge, Registry}; -use redis::{ - AsyncTypedCommands, Cmd, ExistenceCheck, SetExpiry, SetOptions, - ToRedisArgs, cmd, -}; +use redis::{ExistenceCheck, SetExpiry, SetOptions, ToRedisArgs}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -17,6 +14,10 @@ use std::future::Future; use std::hash::Hash; use std::time::Duration; use std::time::Instant; +use tracing::{Instrument, info_span}; +use util::{cmd, redis_pipe}; + +pub mod util; const DEFAULT_EXPIRY: i64 = 60 * 60 * 12; // 12 hours const ACTUAL_EXPIRY: i64 = 60 * 30; // 30 minutes @@ -24,7 +25,7 @@ const ACTUAL_EXPIRY: i64 = 60 * 30; // 30 minutes #[derive(Clone)] pub struct RedisPool { pub url: String, - pub pool: deadpool_redis::Pool, + pub pool: util::InstrumentedPool, meta_namespace: String, } @@ -67,7 +68,7 @@ impl RedisPool { let pool = RedisPool { url, - pool, + pool: util::InstrumentedPool::new(pool), meta_namespace: meta_namespace.unwrap_or("".to_string()), }; @@ -269,12 +270,14 @@ impl RedisPool { return Ok(HashMap::new()); } - let get_cached_values = |ids: DashMap| async move { - let slug_ids = if let Some(slug_namespace) = slug_namespace { - let mut connection = self.pool.get().await?; - cmd("MGET") - .arg( - ids.iter() + let get_cached_values = |ids: DashMap| { + async move { + let slug_ids = if let Some(slug_namespace) = slug_namespace { + async { + let mut connection = self.pool.get().await?; + + let args = ids + .iter() .map(|x| { format!( "{}_{slug_namespace}:{}", @@ -286,45 +289,53 @@ impl RedisPool { } ) }) - .collect::>(), - ) + .collect::>(); + + let v = cmd("MGET") + .arg(&args) + .query_async::>>(&mut connection) + .await? + .into_iter() + .flatten() + .collect::>(); + Ok::<_, DatabaseError>(v) + } + .instrument(info_span!("get slug ids")) + .await? + } else { + Vec::new() + }; + + let mut connection = self.pool.get().await?; + let args = ids + .iter() + .map(|x| x.value().to_string()) + .chain(ids.iter().filter_map(|x| { + parse_base62(&x.value().to_string()) + .ok() + .map(|x| x.to_string()) + })) + .chain(slug_ids) + .map(|x| format!("{}_{namespace}:{x}", self.meta_namespace)) + .collect::>(); + + let cached_values = cmd("MGET") + .arg(&args) .query_async::>>(&mut connection) .await? .into_iter() - .flatten() - .collect::>() - } else { - Vec::new() - }; - - let mut connection = self.pool.get().await?; - let cached_values = cmd("MGET") - .arg( - ids.iter() - .map(|x| x.value().to_string()) - .chain(ids.iter().filter_map(|x| { - parse_base62(&x.value().to_string()) + .filter_map(|x| { + x.and_then(|val| { + serde_json::from_str::>(&val) .ok() - .map(|x| x.to_string()) - })) - .chain(slug_ids) - .map(|x| { - format!("{}_{namespace}:{x}", self.meta_namespace) }) - .collect::>(), - ) - .query_async::>>(&mut connection) - .await? - .into_iter() - .filter_map(|x| { - x.and_then(|val| { - serde_json::from_str::>(&val).ok() + .map(|val| (val.key.clone(), val)) }) - .map(|val| (val.key.clone(), val)) - }) - .collect::>(); + .collect::>(); - Ok::<_, DatabaseError>((cached_values, ids)) + Ok::<_, DatabaseError>((cached_values, ids)) + } + .instrument(info_span!("get cached values")) }; let current_time = Utc::now(); @@ -361,7 +372,7 @@ impl RedisPool { let subscribe_ids = DashMap::new(); if !ids.is_empty() { - let mut pipe = redis::pipe(); + let mut pipe = redis_pipe(); let fetch_ids = ids.iter().map(|x| x.key().clone()).collect::>(); @@ -424,7 +435,7 @@ impl RedisPool { let vals = closure(fetch_ids).await?; let mut return_values = HashMap::new(); - let mut pipe = redis::pipe(); + let mut pipe = redis_pipe(); if !vals.is_empty() { for (key, (slug, value)) in vals { let value = RedisValue { @@ -524,21 +535,21 @@ impl RedisPool { let results = { let acquire_start = Instant::now(); let mut connection = self.pool.get().await?; + let args = subscribe_ids + .iter() + .map(|x| { + format!( + "{}_{namespace}:{}/lock", + self.meta_namespace, + // We lowercase key because locks are stored in lowercase + x.key().to_lowercase() + ) + }) + .collect::>(); redis_budget += acquire_start.elapsed(); + cmd("MGET") - .arg( - subscribe_ids - .iter() - .map(|x| { - format!( - "{}_{namespace}:{}/lock", - self.meta_namespace, - // We lowercase key because locks are stored in lowercase - x.key().to_lowercase() - ) - }) - .collect::>(), - ) + .arg(&args) .query_async::>>(&mut connection) .await? }; @@ -749,10 +760,14 @@ impl RedisConnection { &mut self, namespace: &str, key: &str, - value: impl ToRedisArgs + Send + Sync, + value: impl ToRedisArgs + Send + Sync + Debug, ) -> Result<(), DatabaseError> { let key = format!("{}_{namespace}:{key}", self.meta_namespace); - self.connection.lpush(key, value).await?; + cmd("LPUSH") + .arg(key) + .arg(value) + .query_async::<()>(&mut self.connection) + .await?; Ok(()) } @@ -766,7 +781,11 @@ impl RedisConnection { let key = format!("{}_{namespace}:{key}", self.meta_namespace); // a timeout of 0 is infinite let timeout = timeout.unwrap_or(0.0); - let values = self.connection.brpop(key, timeout).await?; + let values = cmd("BRPOP") + .arg(key) + .arg(timeout) + .query_async(&mut self.connection) + .await?; Ok(values) } } @@ -780,14 +799,14 @@ pub struct RedisValue { val: T, } -pub fn redis_args(cmd: &mut Cmd, args: &[String]) { +pub fn redis_args(cmd: &mut util::InstrumentedCmd, args: &[String]) { for arg in args { cmd.arg(arg); } } pub async fn redis_execute( - cmd: &mut Cmd, + cmd: &mut util::InstrumentedCmd, redis: &mut deadpool_redis::Connection, ) -> Result where diff --git a/apps/labrinth/src/database/redis/util.rs b/apps/labrinth/src/database/redis/util.rs new file mode 100644 index 00000000..1980bc7b --- /dev/null +++ b/apps/labrinth/src/database/redis/util.rs @@ -0,0 +1,93 @@ +use std::fmt::Debug; + +use deadpool_redis::PoolError; +use derive_more::{Deref, DerefMut}; +use redis::{FromRedisValue, RedisResult, ToRedisArgs}; +use tracing::{Instrument, info_span}; + +#[derive(Debug, Clone, Deref, DerefMut)] +pub struct InstrumentedPool { + inner: deadpool_redis::Pool, +} + +impl InstrumentedPool { + pub fn new(inner: deadpool_redis::Pool) -> Self { + Self { inner } + } + + pub async fn get(&self) -> Result { + self.inner + .get() + .instrument(info_span!("get redis connection")) + .await + } +} + +pub fn redis_pipe() -> InstrumentedPipeline { + InstrumentedPipeline { + inner: redis::pipe(), + } +} + +#[derive(Clone, Deref, DerefMut)] +pub struct InstrumentedPipeline { + #[deref] + #[deref_mut] + inner: redis::Pipeline, +} + +impl InstrumentedPipeline { + pub fn atomic(&mut self) -> &mut Self { + self.inner.atomic(); + self + } + + #[inline] + pub async fn query_async( + &self, + con: &mut impl redis::aio::ConnectionLike, + ) -> RedisResult { + self.inner + .query_async(con) + .instrument(info_span!("execute redis pipeline")) + .await + } +} + +pub fn cmd(name: &str) -> InstrumentedCmd { + InstrumentedCmd { + inner: redis::cmd(name), + name: name.to_string(), + args: Vec::new(), + } +} + +pub struct InstrumentedCmd { + inner: redis::Cmd, + name: String, + args: Vec, +} + +impl InstrumentedCmd { + #[inline] + pub fn arg(&mut self, arg: T) -> &mut Self { + self.args.push(format!("{arg:?}")); + self.inner.arg(arg); + self + } + + #[inline] + pub async fn query_async( + &self, + con: &mut impl redis::aio::ConnectionLike, + ) -> RedisResult { + let span = info_span!( + "query_async", + // + db.system.name = "redis", + db.operation.name = self.name, + db.query.text = format!("{} {}", self.name, self.args.join(" ")), + ); + self.inner.query_async(con).instrument(span).await + } +}