Implement redis clustering (#5189)

Co-authored-by: Jai Agrawal <geometrically@Jais-MacBook-Pro.local>
This commit is contained in:
Jai Agrawal
2026-01-23 04:51:17 -08:00
committed by GitHub
parent 5c29a8c7dd
commit fb1050e409
13 changed files with 200 additions and 124 deletions

View File

@@ -2,7 +2,7 @@ use super::models::DatabaseError;
use ariadne::ids::base62_impl::{parse_base62, to_base62};
use chrono::{TimeZone, Utc};
use dashmap::DashMap;
use deadpool_redis::{Config, Runtime};
use deadpool_redis::cluster::{Config, Runtime};
use futures::future::Either;
use prometheus::{IntGauge, Registry};
use redis::{ExistenceCheck, SetExpiry, SetOptions, ToRedisArgs};
@@ -24,13 +24,13 @@ const ACTUAL_EXPIRY: i64 = 60 * 30; // 30 minutes
#[derive(Clone)]
pub struct RedisPool {
pub url: String,
pub urls: Vec<String>,
pub pool: util::InstrumentedPool,
meta_namespace: String,
}
pub struct RedisConnection {
pub connection: deadpool_redis::Connection,
pub connection: deadpool_redis::cluster::Connection,
meta_namespace: String,
}
@@ -51,8 +51,13 @@ impl RedisPool {
},
);
let url = dotenvy::var("REDIS_URL").expect("Redis URL not set");
let pool = Config::from_url(url.clone())
let urls = dotenvy::var("REDIS_URLS")
.expect("Redis URL not set")
.split(',')
.map(String::from)
.collect::<Vec<_>>();
let pool = Config::from_urls(&*urls)
.builder()
.expect("Error building Redis pool")
.max_size(
@@ -67,7 +72,7 @@ impl RedisPool {
.expect("Redis connection failed");
let pool = RedisPool {
url,
urls,
pool: util::InstrumentedPool::new(pool),
meta_namespace: meta_namespace.unwrap_or("".to_string()),
};
@@ -280,7 +285,7 @@ impl RedisPool {
.iter()
.map(|x| {
format!(
"{}_{slug_namespace}:{}",
"{}_{slug_namespace}:{{ns:{namespace}}}:{}",
self.meta_namespace,
if case_sensitive {
x.value().to_string()
@@ -316,7 +321,12 @@ impl RedisPool {
.map(|x| x.to_string())
}))
.chain(slug_ids)
.map(|x| format!("{}_{namespace}:{x}", self.meta_namespace))
.map(|x| {
format!(
"{}_{namespace}:{{ns:{namespace}}}:{x}",
self.meta_namespace
)
})
.collect::<Vec<_>>();
let cached_values = cmd("MGET")
@@ -378,10 +388,10 @@ impl RedisPool {
ids.iter().map(|x| x.key().clone()).collect::<Vec<_>>();
fetch_ids.iter().for_each(|key| {
pipe.atomic().set_options(
pipe.set_options(
// We store locks in lowercase because they are case insensitive
format!(
"{}_{namespace}:{}/lock",
"{}_{namespace}:{{ns:{namespace}}}:{}/lock",
self.meta_namespace,
key.to_lowercase()
),
@@ -445,9 +455,9 @@ impl RedisPool {
alias: slug.clone(),
};
pipe.atomic().set_ex(
pipe.set_ex(
format!(
"{}_{namespace}:{key}",
"{}_{namespace}:{{ns:{namespace}}}:{key}",
self.meta_namespace
),
serde_json::to_string(&value)?,
@@ -464,17 +474,17 @@ impl RedisPool {
slug.to_string().to_lowercase()
};
pipe.atomic().set_ex(
pipe.set_ex(
format!(
"{}_{slug_namespace}:{}",
"{}_{slug_namespace}:{{ns:{namespace}}}:{}",
self.meta_namespace, actual_slug
),
key.to_string(),
DEFAULT_EXPIRY as u64,
);
pipe.atomic().del(format!(
"{}_{namespace}:{}/lock",
pipe.del(format!(
"{}_{namespace}:{{ns:{namespace}}}:{}/lock",
// Locks are stored in lowercase
self.meta_namespace,
actual_slug.to_lowercase()
@@ -489,16 +499,16 @@ impl RedisPool {
let base62 = to_base62(value);
ids.remove(&base62);
pipe.atomic().del(format!(
"{}_{namespace}:{}/lock",
pipe.del(format!(
"{}_{namespace}:{{ns:{namespace}}}:{}/lock",
self.meta_namespace,
// Locks are stored in lowercase
base62.to_lowercase()
));
}
pipe.atomic().del(format!(
"{}_{namespace}:{key}/lock",
pipe.del(format!(
"{}_{namespace}:{{ns:{namespace}}}:{key}/lock",
self.meta_namespace
));
@@ -507,13 +517,13 @@ impl RedisPool {
}
for (key, _) in ids {
pipe.atomic().del(format!(
"{}_{namespace}:{}/lock",
pipe.del(format!(
"{}_{namespace}:{{ns:{namespace}}}:{}/lock",
self.meta_namespace,
key.to_lowercase()
));
pipe.atomic().del(format!(
"{}_{namespace}:{key}/lock",
pipe.del(format!(
"{}_{namespace}:{{ns:{namespace}}}:{key}/lock",
self.meta_namespace
));
}
@@ -539,7 +549,7 @@ impl RedisPool {
.iter()
.map(|x| {
format!(
"{}_{namespace}:{}/lock",
"{}_{namespace}:{{ns:{namespace}}}:{}/lock",
self.meta_namespace,
// We lowercase key because locks are stored in lowercase
x.key().to_lowercase()
@@ -613,7 +623,10 @@ impl RedisConnection {
redis_args(
&mut cmd,
vec![
format!("{}_{}:{}", self.meta_namespace, namespace, id),
format!(
"{}_{}:{{ns:{namespace}}}:{}",
self.meta_namespace, namespace, id
),
data.to_string(),
"EX".to_string(),
expiry.unwrap_or(DEFAULT_EXPIRY).to_string(),
@@ -654,8 +667,11 @@ impl RedisConnection {
let mut cmd = cmd("GET");
redis_args(
&mut cmd,
vec![format!("{}_{}:{}", self.meta_namespace, namespace, id)]
.as_slice(),
vec![format!(
"{}_{}:{{ns:{namespace}}}:{}",
self.meta_namespace, namespace, id
)]
.as_slice(),
);
let res = redis_execute(&mut cmd, &mut self.connection).await?;
Ok(res)
@@ -671,7 +687,12 @@ impl RedisConnection {
redis_args(
&mut cmd,
ids.iter()
.map(|x| format!("{}_{}:{}", self.meta_namespace, namespace, x))
.map(|x| {
format!(
"{}_{}:{{ns:{namespace}}}:{}",
self.meta_namespace, namespace, x
)
})
.collect::<Vec<_>>()
.as_slice(),
);
@@ -723,8 +744,11 @@ impl RedisConnection {
let mut cmd = cmd("DEL");
redis_args(
&mut cmd,
vec![format!("{}_{}:{}", self.meta_namespace, namespace, id)]
.as_slice(),
vec![format!(
"{}_{}:{{ns:{namespace}}}:{}",
self.meta_namespace, namespace, id
)]
.as_slice(),
);
redis_execute::<()>(&mut cmd, &mut self.connection).await?;
Ok(())
@@ -733,16 +757,20 @@ impl RedisConnection {
#[tracing::instrument(skip(self, iter))]
pub async fn delete_many(
&mut self,
iter: impl IntoIterator<Item = (&str, Option<String>)>,
namespace: &str,
iter: impl IntoIterator<Item = Option<String>>,
) -> Result<(), DatabaseError> {
let mut cmd = cmd("DEL");
let mut any = false;
for (namespace, id) in iter {
for id in iter {
if let Some(id) = id {
redis_args(
&mut cmd,
[format!("{}_{}:{}", self.meta_namespace, namespace, id)]
.as_slice(),
[format!(
"{}_{}:{{ns:{namespace}}}:{}",
self.meta_namespace, namespace, id
)]
.as_slice(),
);
any = true;
}
@@ -762,7 +790,10 @@ impl RedisConnection {
key: &str,
value: impl ToRedisArgs + Send + Sync + Debug,
) -> Result<(), DatabaseError> {
let key = format!("{}_{namespace}:{key}", self.meta_namespace);
let key = format!(
"{}_{namespace}:{{ns:{namespace}}}:{key}",
self.meta_namespace
);
cmd("LPUSH")
.arg(key)
.arg(value)
@@ -778,7 +809,10 @@ impl RedisConnection {
key: &str,
timeout: Option<f64>,
) -> Result<Option<[String; 2]>, DatabaseError> {
let key = format!("{}_{namespace}:{key}", self.meta_namespace);
let key = format!(
"{}_{namespace}:{{ns:{namespace}}}:{key}",
self.meta_namespace
);
// a timeout of 0 is infinite
let timeout = timeout.unwrap_or(0.0);
let values = cmd("BRPOP")
@@ -807,7 +841,7 @@ pub fn redis_args(cmd: &mut util::InstrumentedCmd, args: &[String]) {
pub async fn redis_execute<T>(
cmd: &mut util::InstrumentedCmd,
redis: &mut deadpool_redis::Connection,
redis: &mut deadpool_redis::cluster::Connection,
) -> Result<T, deadpool_redis::PoolError>
where
T: redis::FromRedisValue,