diff --git a/Cargo.lock b/Cargo.lock index 8696daeb..888f952a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1809,12 +1809,6 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" -[[package]] -name = "crc16" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" - [[package]] name = "crc32fast" version = "1.5.0" @@ -7035,16 +7029,12 @@ dependencies = [ "bytes", "cfg-if", "combine", - "crc16", - "futures-sink", "futures-util", "itoa", - "log", "num-bigint", "percent-encoding", "pin-project-lite", "r2d2", - "rand 0.9.2", "ryu", "sha1_smol", "socket2 0.6.1", diff --git a/Cargo.toml b/Cargo.toml index b04c0ddf..f49a75fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,7 +60,7 @@ const_format = "0.2.34" daedalus = { path = "packages/daedalus" } dashmap = "6.1.0" data-url = "0.3.2" -deadpool-redis = { version ="0.22.0", features = ["cluster-async"] } +deadpool-redis = "0.22.0" derive_more = "2.0.1" directories = "6.0.0" dirs = "6.0.0" diff --git a/apps/labrinth/src/database/models/notification_item.rs b/apps/labrinth/src/database/models/notification_item.rs index a2fbd06a..f3cedf47 100644 --- a/apps/labrinth/src/database/models/notification_item.rs +++ b/apps/labrinth/src/database/models/notification_item.rs @@ -557,10 +557,9 @@ impl DBNotification { let mut redis = redis.connect().await?; redis - .delete_many( - USER_NOTIFICATIONS_NAMESPACE, - user_ids.into_iter().map(|id| Some(id.0.to_string())), - ) + .delete_many(user_ids.into_iter().map(|id| { + (USER_NOTIFICATIONS_NAMESPACE, Some(id.0.to_string())) + })) .await?; Ok(()) diff --git a/apps/labrinth/src/database/models/organization_item.rs b/apps/labrinth/src/database/models/organization_item.rs index 88b77bff..56637d01 100644 --- a/apps/labrinth/src/database/models/organization_item.rs +++ b/apps/labrinth/src/database/models/organization_item.rs @@ -256,16 +256,15 @@ impl DBOrganization { ) -> Result<(), super::DatabaseError> { let mut redis = redis.connect().await?; - if let Some(slug) = slug { - redis - .delete(ORGANIZATIONS_TITLES_NAMESPACE, slug.to_lowercase()) - .await?; - } - redis - .delete(ORGANIZATIONS_NAMESPACE, id.0.to_string()) + .delete_many([ + (ORGANIZATIONS_NAMESPACE, Some(id.0.to_string())), + ( + ORGANIZATIONS_TITLES_NAMESPACE, + slug.map(|x| x.to_lowercase()), + ), + ]) .await?; - Ok(()) } } diff --git a/apps/labrinth/src/database/models/pat_item.rs b/apps/labrinth/src/database/models/pat_item.rs index f6f4aef8..735f7efd 100644 --- a/apps/labrinth/src/database/models/pat_item.rs +++ b/apps/labrinth/src/database/models/pat_item.rs @@ -209,26 +209,18 @@ impl DBPersonalAccessToken { } redis - .delete_many( - PATS_NAMESPACE, - clear_pats - .iter() - .map(|(x, _, _)| x.map(|i| i.0.to_string())), - ) - .await?; - redis - .delete_many( - PATS_TOKENS_NAMESPACE, - clear_pats.iter().map(|(_, token, _)| token.clone()), - ) - .await?; - redis - .delete_many( - PATS_USERS_NAMESPACE, - clear_pats - .iter() - .map(|(_, _, x)| x.map(|i| i.0.to_string())), - ) + .delete_many(clear_pats.into_iter().flat_map( + |(id, token, user_id)| { + [ + (PATS_NAMESPACE, id.map(|i| i.0.to_string())), + (PATS_TOKENS_NAMESPACE, token), + ( + PATS_USERS_NAMESPACE, + user_id.map(|i| i.0.to_string()), + ), + ] + }, + )) .await?; Ok(()) diff --git a/apps/labrinth/src/database/models/project_item.rs b/apps/labrinth/src/database/models/project_item.rs index fbcb3fb2..717ebf7e 100644 --- a/apps/labrinth/src/database/models/project_item.rs +++ b/apps/labrinth/src/database/models/project_item.rs @@ -953,20 +953,20 @@ impl DBProject { ) -> Result<(), DatabaseError> { let mut redis = redis.connect().await?; - redis.delete(PROJECTS_NAMESPACE, id.0.to_string()).await?; - - if let Some(slug) = slug { - redis - .delete(PROJECTS_SLUGS_NAMESPACE, slug.to_lowercase()) - .await?; - } - - if clear_dependencies.unwrap_or(false) { - redis - .delete(PROJECTS_DEPENDENCIES_NAMESPACE, id.0.to_string()) - .await?; - } - + redis + .delete_many([ + (PROJECTS_NAMESPACE, Some(id.0.to_string())), + (PROJECTS_SLUGS_NAMESPACE, slug.map(|x| x.to_lowercase())), + ( + PROJECTS_DEPENDENCIES_NAMESPACE, + if clear_dependencies.unwrap_or(false) { + Some(id.0.to_string()) + } else { + None + }, + ), + ]) + .await?; Ok(()) } } diff --git a/apps/labrinth/src/database/models/session_item.rs b/apps/labrinth/src/database/models/session_item.rs index b3f4dee8..2b1bac1c 100644 --- a/apps/labrinth/src/database/models/session_item.rs +++ b/apps/labrinth/src/database/models/session_item.rs @@ -268,28 +268,19 @@ impl DBSession { } redis - .delete_many( - SESSIONS_NAMESPACE, - clear_sessions - .iter() - .map(|(x, _, _)| x.map(|x| x.0.to_string())), - ) + .delete_many(clear_sessions.into_iter().flat_map( + |(id, session, user_id)| { + [ + (SESSIONS_NAMESPACE, id.map(|i| i.0.to_string())), + (SESSIONS_IDS_NAMESPACE, session), + ( + SESSIONS_USERS_NAMESPACE, + user_id.map(|i| i.0.to_string()), + ), + ] + }, + )) .await?; - redis - .delete_many( - SESSIONS_IDS_NAMESPACE, - clear_sessions.iter().map(|(_, session, _)| session.clone()), - ) - .await?; - redis - .delete_many( - SESSIONS_USERS_NAMESPACE, - clear_sessions - .iter() - .map(|(_, _, x)| x.map(|x| x.0.to_string())), - ) - .await?; - Ok(()) } diff --git a/apps/labrinth/src/database/models/user_item.rs b/apps/labrinth/src/database/models/user_item.rs index 7032c2bb..b28e005c 100644 --- a/apps/labrinth/src/database/models/user_item.rs +++ b/apps/labrinth/src/database/models/user_item.rs @@ -470,16 +470,15 @@ impl DBUser { let mut redis = redis.connect().await?; redis - .delete_many( - USERS_NAMESPACE, - user_ids.iter().map(|(id, _)| Some(id.0.to_string())), - ) - .await?; - redis - .delete_many( - USER_USERNAMES_NAMESPACE, - user_ids.iter().map(|(_, username)| username.clone()), - ) + .delete_many(user_ids.iter().flat_map(|(id, username)| { + [ + (USERS_NAMESPACE, Some(id.0.to_string())), + ( + USER_USERNAMES_NAMESPACE, + username.clone().map(|i| i.to_lowercase()), + ), + ] + })) .await?; Ok(()) } @@ -492,8 +491,9 @@ impl DBUser { redis .delete_many( - USERS_PROJECTS_NAMESPACE, - user_ids.iter().map(|id| Some(id.0.to_string())), + user_ids.iter().map(|id| { + (USERS_PROJECTS_NAMESPACE, Some(id.0.to_string())) + }), ) .await?; diff --git a/apps/labrinth/src/database/models/version_item.rs b/apps/labrinth/src/database/models/version_item.rs index 065631a8..d0ba9c0b 100644 --- a/apps/labrinth/src/database/models/version_item.rs +++ b/apps/labrinth/src/database/models/version_item.rs @@ -14,6 +14,7 @@ use itertools::Itertools; use serde::{Deserialize, Serialize}; use std::cmp::Ordering; use std::collections::HashMap; +use std::iter; pub const VERSIONS_NAMESPACE: &str = "versions"; const VERSION_FILES_NAMESPACE: &str = "versions_files"; @@ -913,21 +914,24 @@ impl DBVersion { ) -> Result<(), DatabaseError> { let mut redis = redis.connect().await?; - redis - .delete(VERSIONS_NAMESPACE, version.inner.id.0.to_string()) - .await?; - redis .delete_many( - VERSION_FILES_NAMESPACE, - version.files.iter().flat_map(|file| { - file.hashes - .iter() - .map(|(algo, hash)| Some(format!("{algo}_{hash}"))) - }), + iter::once(( + VERSIONS_NAMESPACE, + Some(version.inner.id.0.to_string()), + )) + .chain(version.files.iter().flat_map( + |file| { + file.hashes.iter().map(|(algo, hash)| { + ( + VERSION_FILES_NAMESPACE, + Some(format!("{algo}_{hash}")), + ) + }) + }, + )), ) .await?; - Ok(()) } } diff --git a/apps/labrinth/src/database/redis/mod.rs b/apps/labrinth/src/database/redis/mod.rs index ecba1e56..dd4e11e1 100644 --- a/apps/labrinth/src/database/redis/mod.rs +++ b/apps/labrinth/src/database/redis/mod.rs @@ -2,7 +2,7 @@ use super::models::DatabaseError; use ariadne::ids::base62_impl::{parse_base62, to_base62}; use chrono::{TimeZone, Utc}; use dashmap::DashMap; -use deadpool_redis::cluster::{Config, Runtime}; +use deadpool_redis::{Config, Runtime}; use futures::future::Either; use prometheus::{IntGauge, Registry}; use redis::{ExistenceCheck, SetExpiry, SetOptions, ToRedisArgs}; @@ -24,13 +24,13 @@ const ACTUAL_EXPIRY: i64 = 60 * 30; // 30 minutes #[derive(Clone)] pub struct RedisPool { - pub urls: Vec, + pub url: String, pub pool: util::InstrumentedPool, meta_namespace: String, } pub struct RedisConnection { - pub connection: deadpool_redis::cluster::Connection, + pub connection: deadpool_redis::Connection, meta_namespace: String, } @@ -51,13 +51,8 @@ impl RedisPool { }, ); - let urls = dotenvy::var("REDIS_URLS") - .expect("Redis URL not set") - .split(',') - .map(String::from) - .collect::>(); - - let pool = Config::from_urls(&*urls) + let url = dotenvy::var("REDIS_URL").expect("Redis URL not set"); + let pool = Config::from_url(url.clone()) .builder() .expect("Error building Redis pool") .max_size( @@ -72,7 +67,7 @@ impl RedisPool { .expect("Redis connection failed"); let pool = RedisPool { - urls, + url, pool: util::InstrumentedPool::new(pool), meta_namespace: meta_namespace.unwrap_or("".to_string()), }; @@ -285,7 +280,7 @@ impl RedisPool { .iter() .map(|x| { format!( - "{}_{slug_namespace}:{{ns:{namespace}}}:{}", + "{}_{slug_namespace}:{}", self.meta_namespace, if case_sensitive { x.value().to_string() @@ -321,12 +316,7 @@ impl RedisPool { .map(|x| x.to_string()) })) .chain(slug_ids) - .map(|x| { - format!( - "{}_{namespace}:{{ns:{namespace}}}:{x}", - self.meta_namespace - ) - }) + .map(|x| format!("{}_{namespace}:{x}", self.meta_namespace)) .collect::>(); let cached_values = cmd("MGET") @@ -388,10 +378,10 @@ impl RedisPool { ids.iter().map(|x| x.key().clone()).collect::>(); fetch_ids.iter().for_each(|key| { - pipe.set_options( + pipe.atomic().set_options( // We store locks in lowercase because they are case insensitive format!( - "{}_{namespace}:{{ns:{namespace}}}:{}/lock", + "{}_{namespace}:{}/lock", self.meta_namespace, key.to_lowercase() ), @@ -455,9 +445,9 @@ impl RedisPool { alias: slug.clone(), }; - pipe.set_ex( + pipe.atomic().set_ex( format!( - "{}_{namespace}:{{ns:{namespace}}}:{key}", + "{}_{namespace}:{key}", self.meta_namespace ), serde_json::to_string(&value)?, @@ -474,17 +464,17 @@ impl RedisPool { slug.to_string().to_lowercase() }; - pipe.set_ex( + pipe.atomic().set_ex( format!( - "{}_{slug_namespace}:{{ns:{namespace}}}:{}", + "{}_{slug_namespace}:{}", self.meta_namespace, actual_slug ), key.to_string(), DEFAULT_EXPIRY as u64, ); - pipe.del(format!( - "{}_{namespace}:{{ns:{namespace}}}:{}/lock", + pipe.atomic().del(format!( + "{}_{namespace}:{}/lock", // Locks are stored in lowercase self.meta_namespace, actual_slug.to_lowercase() @@ -499,16 +489,16 @@ impl RedisPool { let base62 = to_base62(value); ids.remove(&base62); - pipe.del(format!( - "{}_{namespace}:{{ns:{namespace}}}:{}/lock", + pipe.atomic().del(format!( + "{}_{namespace}:{}/lock", self.meta_namespace, // Locks are stored in lowercase base62.to_lowercase() )); } - pipe.del(format!( - "{}_{namespace}:{{ns:{namespace}}}:{key}/lock", + pipe.atomic().del(format!( + "{}_{namespace}:{key}/lock", self.meta_namespace )); @@ -517,13 +507,13 @@ impl RedisPool { } for (key, _) in ids { - pipe.del(format!( - "{}_{namespace}:{{ns:{namespace}}}:{}/lock", + pipe.atomic().del(format!( + "{}_{namespace}:{}/lock", self.meta_namespace, key.to_lowercase() )); - pipe.del(format!( - "{}_{namespace}:{{ns:{namespace}}}:{key}/lock", + pipe.atomic().del(format!( + "{}_{namespace}:{key}/lock", self.meta_namespace )); } @@ -549,7 +539,7 @@ impl RedisPool { .iter() .map(|x| { format!( - "{}_{namespace}:{{ns:{namespace}}}:{}/lock", + "{}_{namespace}:{}/lock", self.meta_namespace, // We lowercase key because locks are stored in lowercase x.key().to_lowercase() @@ -623,10 +613,7 @@ impl RedisConnection { redis_args( &mut cmd, vec![ - format!( - "{}_{}:{{ns:{namespace}}}:{}", - self.meta_namespace, namespace, id - ), + format!("{}_{}:{}", self.meta_namespace, namespace, id), data.to_string(), "EX".to_string(), expiry.unwrap_or(DEFAULT_EXPIRY).to_string(), @@ -667,11 +654,8 @@ impl RedisConnection { let mut cmd = cmd("GET"); redis_args( &mut cmd, - vec![format!( - "{}_{}:{{ns:{namespace}}}:{}", - self.meta_namespace, namespace, id - )] - .as_slice(), + vec![format!("{}_{}:{}", self.meta_namespace, namespace, id)] + .as_slice(), ); let res = redis_execute(&mut cmd, &mut self.connection).await?; Ok(res) @@ -687,12 +671,7 @@ impl RedisConnection { redis_args( &mut cmd, ids.iter() - .map(|x| { - format!( - "{}_{}:{{ns:{namespace}}}:{}", - self.meta_namespace, namespace, x - ) - }) + .map(|x| format!("{}_{}:{}", self.meta_namespace, namespace, x)) .collect::>() .as_slice(), ); @@ -744,11 +723,8 @@ impl RedisConnection { let mut cmd = cmd("DEL"); redis_args( &mut cmd, - vec![format!( - "{}_{}:{{ns:{namespace}}}:{}", - self.meta_namespace, namespace, id - )] - .as_slice(), + vec![format!("{}_{}:{}", self.meta_namespace, namespace, id)] + .as_slice(), ); redis_execute::<()>(&mut cmd, &mut self.connection).await?; Ok(()) @@ -757,20 +733,16 @@ impl RedisConnection { #[tracing::instrument(skip(self, iter))] pub async fn delete_many( &mut self, - namespace: &str, - iter: impl IntoIterator>, + iter: impl IntoIterator)>, ) -> Result<(), DatabaseError> { let mut cmd = cmd("DEL"); let mut any = false; - for id in iter { + for (namespace, id) in iter { if let Some(id) = id { redis_args( &mut cmd, - [format!( - "{}_{}:{{ns:{namespace}}}:{}", - self.meta_namespace, namespace, id - )] - .as_slice(), + [format!("{}_{}:{}", self.meta_namespace, namespace, id)] + .as_slice(), ); any = true; } @@ -790,10 +762,7 @@ impl RedisConnection { key: &str, value: impl ToRedisArgs + Send + Sync + Debug, ) -> Result<(), DatabaseError> { - let key = format!( - "{}_{namespace}:{{ns:{namespace}}}:{key}", - self.meta_namespace - ); + let key = format!("{}_{namespace}:{key}", self.meta_namespace); cmd("LPUSH") .arg(key) .arg(value) @@ -809,10 +778,7 @@ impl RedisConnection { key: &str, timeout: Option, ) -> Result, DatabaseError> { - let key = format!( - "{}_{namespace}:{{ns:{namespace}}}:{key}", - self.meta_namespace - ); + let key = format!("{}_{namespace}:{key}", self.meta_namespace); // a timeout of 0 is infinite let timeout = timeout.unwrap_or(0.0); let values = cmd("BRPOP") @@ -841,7 +807,7 @@ pub fn redis_args(cmd: &mut util::InstrumentedCmd, args: &[String]) { pub async fn redis_execute( cmd: &mut util::InstrumentedCmd, - redis: &mut deadpool_redis::cluster::Connection, + redis: &mut deadpool_redis::Connection, ) -> Result where T: redis::FromRedisValue, diff --git a/apps/labrinth/src/database/redis/util.rs b/apps/labrinth/src/database/redis/util.rs index 7d4fe032..1980bc7b 100644 --- a/apps/labrinth/src/database/redis/util.rs +++ b/apps/labrinth/src/database/redis/util.rs @@ -5,19 +5,17 @@ use derive_more::{Deref, DerefMut}; use redis::{FromRedisValue, RedisResult, ToRedisArgs}; use tracing::{Instrument, info_span}; -#[derive(/*Debug, */ Clone, Deref, DerefMut)] +#[derive(Debug, Clone, Deref, DerefMut)] pub struct InstrumentedPool { - inner: deadpool_redis::cluster::Pool, + inner: deadpool_redis::Pool, } impl InstrumentedPool { - pub fn new(inner: deadpool_redis::cluster::Pool) -> Self { + pub fn new(inner: deadpool_redis::Pool) -> Self { Self { inner } } - pub async fn get( - &self, - ) -> Result { + pub async fn get(&self) -> Result { self.inner .get() .instrument(info_span!("get redis connection")) diff --git a/apps/labrinth/src/lib.rs b/apps/labrinth/src/lib.rs index 0f125215..4376f2c2 100644 --- a/apps/labrinth/src/lib.rs +++ b/apps/labrinth/src/lib.rs @@ -260,8 +260,7 @@ pub fn app_setup( { let pool = pool.clone(); - let redis_client = - redis::Client::open(redis_pool.urls[0].clone()).unwrap(); + let redis_client = redis::Client::open(redis_pool.url.clone()).unwrap(); let sockets = active_sockets.clone(); actix_rt::spawn(async move { let pubsub = redis_client.get_async_pubsub().await.unwrap(); @@ -387,7 +386,7 @@ pub fn check_env_vars() -> bool { failed |= check_var::("MEILISEARCH_READ_ADDR"); failed |= check_var::("MEILISEARCH_WRITE_ADDRS"); failed |= check_var::("MEILISEARCH_KEY"); - failed |= check_var::("REDIS_URLS"); + failed |= check_var::("REDIS_URL"); failed |= check_var::("BIND_ADDR"); failed |= check_var::("SELF_ADDR"); diff --git a/apps/labrinth/src/queue/analytics.rs b/apps/labrinth/src/queue/analytics.rs index 2c4bdaee..bb0373cb 100644 --- a/apps/labrinth/src/queue/analytics.rs +++ b/apps/labrinth/src/queue/analytics.rs @@ -120,12 +120,7 @@ impl AnalyticsQueue { .arg( views_keys .iter() - .map(|x| { - format!( - "{}:{{ns:{VIEWS_NAMESPACE}}}:{}-{}", - VIEWS_NAMESPACE, x.0, x.1 - ) - }) + .map(|x| format!("{}:{}-{}", VIEWS_NAMESPACE, x.0, x.1)) .collect::>(), ) .query_async::>>(&mut redis) @@ -157,10 +152,7 @@ impl AnalyticsQueue { }; pipe.atomic().set_ex( - format!( - "{}:{{ns:{VIEWS_NAMESPACE}}}:{}-{}", - VIEWS_NAMESPACE, key.0, key.1 - ), + format!("{}:{}-{}", VIEWS_NAMESPACE, key.0, key.1), new_count, 6 * 60 * 60, ); @@ -203,10 +195,7 @@ impl AnalyticsQueue { downloads_keys .iter() .map(|x| { - format!( - "{}:{{ns:{VIEWS_NAMESPACE}}}:{}-{}", - DOWNLOADS_NAMESPACE, x.0, x.1 - ) + format!("{}:{}-{}", DOWNLOADS_NAMESPACE, x.0, x.1) }) .collect::>(), ) @@ -230,10 +219,7 @@ impl AnalyticsQueue { }; pipe.atomic().set_ex( - format!( - "{}:{{ns:{VIEWS_NAMESPACE}}}:{}-{}", - DOWNLOADS_NAMESPACE, key.0, key.1 - ), + format!("{}:{}-{}", DOWNLOADS_NAMESPACE, key.0, key.1), new_count, 6 * 60 * 60, );