You've already forked AstralRinth
forked from didirus/AstralRinth
Revert "Implement redis clustering (#5189)"
This reverts commit fb1050e409.
This commit is contained in:
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -1809,12 +1809,6 @@ version = "2.4.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
|
checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "crc16"
|
|
||||||
version = "0.4.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "crc32fast"
|
name = "crc32fast"
|
||||||
version = "1.5.0"
|
version = "1.5.0"
|
||||||
@@ -7035,16 +7029,12 @@ dependencies = [
|
|||||||
"bytes",
|
"bytes",
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"combine",
|
"combine",
|
||||||
"crc16",
|
|
||||||
"futures-sink",
|
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"itoa",
|
"itoa",
|
||||||
"log",
|
|
||||||
"num-bigint",
|
"num-bigint",
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"r2d2",
|
"r2d2",
|
||||||
"rand 0.9.2",
|
|
||||||
"ryu",
|
"ryu",
|
||||||
"sha1_smol",
|
"sha1_smol",
|
||||||
"socket2 0.6.1",
|
"socket2 0.6.1",
|
||||||
|
|||||||
@@ -60,7 +60,7 @@ const_format = "0.2.34"
|
|||||||
daedalus = { path = "packages/daedalus" }
|
daedalus = { path = "packages/daedalus" }
|
||||||
dashmap = "6.1.0"
|
dashmap = "6.1.0"
|
||||||
data-url = "0.3.2"
|
data-url = "0.3.2"
|
||||||
deadpool-redis = { version ="0.22.0", features = ["cluster-async"] }
|
deadpool-redis = "0.22.0"
|
||||||
derive_more = "2.0.1"
|
derive_more = "2.0.1"
|
||||||
directories = "6.0.0"
|
directories = "6.0.0"
|
||||||
dirs = "6.0.0"
|
dirs = "6.0.0"
|
||||||
|
|||||||
@@ -557,10 +557,9 @@ impl DBNotification {
|
|||||||
let mut redis = redis.connect().await?;
|
let mut redis = redis.connect().await?;
|
||||||
|
|
||||||
redis
|
redis
|
||||||
.delete_many(
|
.delete_many(user_ids.into_iter().map(|id| {
|
||||||
USER_NOTIFICATIONS_NAMESPACE,
|
(USER_NOTIFICATIONS_NAMESPACE, Some(id.0.to_string()))
|
||||||
user_ids.into_iter().map(|id| Some(id.0.to_string())),
|
}))
|
||||||
)
|
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -256,16 +256,15 @@ impl DBOrganization {
|
|||||||
) -> Result<(), super::DatabaseError> {
|
) -> Result<(), super::DatabaseError> {
|
||||||
let mut redis = redis.connect().await?;
|
let mut redis = redis.connect().await?;
|
||||||
|
|
||||||
if let Some(slug) = slug {
|
|
||||||
redis
|
|
||||||
.delete(ORGANIZATIONS_TITLES_NAMESPACE, slug.to_lowercase())
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
redis
|
redis
|
||||||
.delete(ORGANIZATIONS_NAMESPACE, id.0.to_string())
|
.delete_many([
|
||||||
|
(ORGANIZATIONS_NAMESPACE, Some(id.0.to_string())),
|
||||||
|
(
|
||||||
|
ORGANIZATIONS_TITLES_NAMESPACE,
|
||||||
|
slug.map(|x| x.to_lowercase()),
|
||||||
|
),
|
||||||
|
])
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -209,26 +209,18 @@ impl DBPersonalAccessToken {
|
|||||||
}
|
}
|
||||||
|
|
||||||
redis
|
redis
|
||||||
.delete_many(
|
.delete_many(clear_pats.into_iter().flat_map(
|
||||||
PATS_NAMESPACE,
|
|(id, token, user_id)| {
|
||||||
clear_pats
|
[
|
||||||
.iter()
|
(PATS_NAMESPACE, id.map(|i| i.0.to_string())),
|
||||||
.map(|(x, _, _)| x.map(|i| i.0.to_string())),
|
(PATS_TOKENS_NAMESPACE, token),
|
||||||
)
|
(
|
||||||
.await?;
|
PATS_USERS_NAMESPACE,
|
||||||
redis
|
user_id.map(|i| i.0.to_string()),
|
||||||
.delete_many(
|
),
|
||||||
PATS_TOKENS_NAMESPACE,
|
]
|
||||||
clear_pats.iter().map(|(_, token, _)| token.clone()),
|
},
|
||||||
)
|
))
|
||||||
.await?;
|
|
||||||
redis
|
|
||||||
.delete_many(
|
|
||||||
PATS_USERS_NAMESPACE,
|
|
||||||
clear_pats
|
|
||||||
.iter()
|
|
||||||
.map(|(_, _, x)| x.map(|i| i.0.to_string())),
|
|
||||||
)
|
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -953,20 +953,20 @@ impl DBProject {
|
|||||||
) -> Result<(), DatabaseError> {
|
) -> Result<(), DatabaseError> {
|
||||||
let mut redis = redis.connect().await?;
|
let mut redis = redis.connect().await?;
|
||||||
|
|
||||||
redis.delete(PROJECTS_NAMESPACE, id.0.to_string()).await?;
|
redis
|
||||||
|
.delete_many([
|
||||||
if let Some(slug) = slug {
|
(PROJECTS_NAMESPACE, Some(id.0.to_string())),
|
||||||
redis
|
(PROJECTS_SLUGS_NAMESPACE, slug.map(|x| x.to_lowercase())),
|
||||||
.delete(PROJECTS_SLUGS_NAMESPACE, slug.to_lowercase())
|
(
|
||||||
.await?;
|
PROJECTS_DEPENDENCIES_NAMESPACE,
|
||||||
}
|
if clear_dependencies.unwrap_or(false) {
|
||||||
|
Some(id.0.to_string())
|
||||||
if clear_dependencies.unwrap_or(false) {
|
} else {
|
||||||
redis
|
None
|
||||||
.delete(PROJECTS_DEPENDENCIES_NAMESPACE, id.0.to_string())
|
},
|
||||||
.await?;
|
),
|
||||||
}
|
])
|
||||||
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -268,28 +268,19 @@ impl DBSession {
|
|||||||
}
|
}
|
||||||
|
|
||||||
redis
|
redis
|
||||||
.delete_many(
|
.delete_many(clear_sessions.into_iter().flat_map(
|
||||||
SESSIONS_NAMESPACE,
|
|(id, session, user_id)| {
|
||||||
clear_sessions
|
[
|
||||||
.iter()
|
(SESSIONS_NAMESPACE, id.map(|i| i.0.to_string())),
|
||||||
.map(|(x, _, _)| x.map(|x| x.0.to_string())),
|
(SESSIONS_IDS_NAMESPACE, session),
|
||||||
)
|
(
|
||||||
|
SESSIONS_USERS_NAMESPACE,
|
||||||
|
user_id.map(|i| i.0.to_string()),
|
||||||
|
),
|
||||||
|
]
|
||||||
|
},
|
||||||
|
))
|
||||||
.await?;
|
.await?;
|
||||||
redis
|
|
||||||
.delete_many(
|
|
||||||
SESSIONS_IDS_NAMESPACE,
|
|
||||||
clear_sessions.iter().map(|(_, session, _)| session.clone()),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
redis
|
|
||||||
.delete_many(
|
|
||||||
SESSIONS_USERS_NAMESPACE,
|
|
||||||
clear_sessions
|
|
||||||
.iter()
|
|
||||||
.map(|(_, _, x)| x.map(|x| x.0.to_string())),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -470,16 +470,15 @@ impl DBUser {
|
|||||||
let mut redis = redis.connect().await?;
|
let mut redis = redis.connect().await?;
|
||||||
|
|
||||||
redis
|
redis
|
||||||
.delete_many(
|
.delete_many(user_ids.iter().flat_map(|(id, username)| {
|
||||||
USERS_NAMESPACE,
|
[
|
||||||
user_ids.iter().map(|(id, _)| Some(id.0.to_string())),
|
(USERS_NAMESPACE, Some(id.0.to_string())),
|
||||||
)
|
(
|
||||||
.await?;
|
USER_USERNAMES_NAMESPACE,
|
||||||
redis
|
username.clone().map(|i| i.to_lowercase()),
|
||||||
.delete_many(
|
),
|
||||||
USER_USERNAMES_NAMESPACE,
|
]
|
||||||
user_ids.iter().map(|(_, username)| username.clone()),
|
}))
|
||||||
)
|
|
||||||
.await?;
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -492,8 +491,9 @@ impl DBUser {
|
|||||||
|
|
||||||
redis
|
redis
|
||||||
.delete_many(
|
.delete_many(
|
||||||
USERS_PROJECTS_NAMESPACE,
|
user_ids.iter().map(|id| {
|
||||||
user_ids.iter().map(|id| Some(id.0.to_string())),
|
(USERS_PROJECTS_NAMESPACE, Some(id.0.to_string()))
|
||||||
|
}),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ use itertools::Itertools;
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::cmp::Ordering;
|
use std::cmp::Ordering;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::iter;
|
||||||
|
|
||||||
pub const VERSIONS_NAMESPACE: &str = "versions";
|
pub const VERSIONS_NAMESPACE: &str = "versions";
|
||||||
const VERSION_FILES_NAMESPACE: &str = "versions_files";
|
const VERSION_FILES_NAMESPACE: &str = "versions_files";
|
||||||
@@ -913,21 +914,24 @@ impl DBVersion {
|
|||||||
) -> Result<(), DatabaseError> {
|
) -> Result<(), DatabaseError> {
|
||||||
let mut redis = redis.connect().await?;
|
let mut redis = redis.connect().await?;
|
||||||
|
|
||||||
redis
|
|
||||||
.delete(VERSIONS_NAMESPACE, version.inner.id.0.to_string())
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
redis
|
redis
|
||||||
.delete_many(
|
.delete_many(
|
||||||
VERSION_FILES_NAMESPACE,
|
iter::once((
|
||||||
version.files.iter().flat_map(|file| {
|
VERSIONS_NAMESPACE,
|
||||||
file.hashes
|
Some(version.inner.id.0.to_string()),
|
||||||
.iter()
|
))
|
||||||
.map(|(algo, hash)| Some(format!("{algo}_{hash}")))
|
.chain(version.files.iter().flat_map(
|
||||||
}),
|
|file| {
|
||||||
|
file.hashes.iter().map(|(algo, hash)| {
|
||||||
|
(
|
||||||
|
VERSION_FILES_NAMESPACE,
|
||||||
|
Some(format!("{algo}_{hash}")),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
},
|
||||||
|
)),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ use super::models::DatabaseError;
|
|||||||
use ariadne::ids::base62_impl::{parse_base62, to_base62};
|
use ariadne::ids::base62_impl::{parse_base62, to_base62};
|
||||||
use chrono::{TimeZone, Utc};
|
use chrono::{TimeZone, Utc};
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use deadpool_redis::cluster::{Config, Runtime};
|
use deadpool_redis::{Config, Runtime};
|
||||||
use futures::future::Either;
|
use futures::future::Either;
|
||||||
use prometheus::{IntGauge, Registry};
|
use prometheus::{IntGauge, Registry};
|
||||||
use redis::{ExistenceCheck, SetExpiry, SetOptions, ToRedisArgs};
|
use redis::{ExistenceCheck, SetExpiry, SetOptions, ToRedisArgs};
|
||||||
@@ -24,13 +24,13 @@ const ACTUAL_EXPIRY: i64 = 60 * 30; // 30 minutes
|
|||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct RedisPool {
|
pub struct RedisPool {
|
||||||
pub urls: Vec<String>,
|
pub url: String,
|
||||||
pub pool: util::InstrumentedPool,
|
pub pool: util::InstrumentedPool,
|
||||||
meta_namespace: String,
|
meta_namespace: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct RedisConnection {
|
pub struct RedisConnection {
|
||||||
pub connection: deadpool_redis::cluster::Connection,
|
pub connection: deadpool_redis::Connection,
|
||||||
meta_namespace: String,
|
meta_namespace: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -51,13 +51,8 @@ impl RedisPool {
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
let urls = dotenvy::var("REDIS_URLS")
|
let url = dotenvy::var("REDIS_URL").expect("Redis URL not set");
|
||||||
.expect("Redis URL not set")
|
let pool = Config::from_url(url.clone())
|
||||||
.split(',')
|
|
||||||
.map(String::from)
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let pool = Config::from_urls(&*urls)
|
|
||||||
.builder()
|
.builder()
|
||||||
.expect("Error building Redis pool")
|
.expect("Error building Redis pool")
|
||||||
.max_size(
|
.max_size(
|
||||||
@@ -72,7 +67,7 @@ impl RedisPool {
|
|||||||
.expect("Redis connection failed");
|
.expect("Redis connection failed");
|
||||||
|
|
||||||
let pool = RedisPool {
|
let pool = RedisPool {
|
||||||
urls,
|
url,
|
||||||
pool: util::InstrumentedPool::new(pool),
|
pool: util::InstrumentedPool::new(pool),
|
||||||
meta_namespace: meta_namespace.unwrap_or("".to_string()),
|
meta_namespace: meta_namespace.unwrap_or("".to_string()),
|
||||||
};
|
};
|
||||||
@@ -285,7 +280,7 @@ impl RedisPool {
|
|||||||
.iter()
|
.iter()
|
||||||
.map(|x| {
|
.map(|x| {
|
||||||
format!(
|
format!(
|
||||||
"{}_{slug_namespace}:{{ns:{namespace}}}:{}",
|
"{}_{slug_namespace}:{}",
|
||||||
self.meta_namespace,
|
self.meta_namespace,
|
||||||
if case_sensitive {
|
if case_sensitive {
|
||||||
x.value().to_string()
|
x.value().to_string()
|
||||||
@@ -321,12 +316,7 @@ impl RedisPool {
|
|||||||
.map(|x| x.to_string())
|
.map(|x| x.to_string())
|
||||||
}))
|
}))
|
||||||
.chain(slug_ids)
|
.chain(slug_ids)
|
||||||
.map(|x| {
|
.map(|x| format!("{}_{namespace}:{x}", self.meta_namespace))
|
||||||
format!(
|
|
||||||
"{}_{namespace}:{{ns:{namespace}}}:{x}",
|
|
||||||
self.meta_namespace
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
let cached_values = cmd("MGET")
|
let cached_values = cmd("MGET")
|
||||||
@@ -388,10 +378,10 @@ impl RedisPool {
|
|||||||
ids.iter().map(|x| x.key().clone()).collect::<Vec<_>>();
|
ids.iter().map(|x| x.key().clone()).collect::<Vec<_>>();
|
||||||
|
|
||||||
fetch_ids.iter().for_each(|key| {
|
fetch_ids.iter().for_each(|key| {
|
||||||
pipe.set_options(
|
pipe.atomic().set_options(
|
||||||
// We store locks in lowercase because they are case insensitive
|
// We store locks in lowercase because they are case insensitive
|
||||||
format!(
|
format!(
|
||||||
"{}_{namespace}:{{ns:{namespace}}}:{}/lock",
|
"{}_{namespace}:{}/lock",
|
||||||
self.meta_namespace,
|
self.meta_namespace,
|
||||||
key.to_lowercase()
|
key.to_lowercase()
|
||||||
),
|
),
|
||||||
@@ -455,9 +445,9 @@ impl RedisPool {
|
|||||||
alias: slug.clone(),
|
alias: slug.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
pipe.set_ex(
|
pipe.atomic().set_ex(
|
||||||
format!(
|
format!(
|
||||||
"{}_{namespace}:{{ns:{namespace}}}:{key}",
|
"{}_{namespace}:{key}",
|
||||||
self.meta_namespace
|
self.meta_namespace
|
||||||
),
|
),
|
||||||
serde_json::to_string(&value)?,
|
serde_json::to_string(&value)?,
|
||||||
@@ -474,17 +464,17 @@ impl RedisPool {
|
|||||||
slug.to_string().to_lowercase()
|
slug.to_string().to_lowercase()
|
||||||
};
|
};
|
||||||
|
|
||||||
pipe.set_ex(
|
pipe.atomic().set_ex(
|
||||||
format!(
|
format!(
|
||||||
"{}_{slug_namespace}:{{ns:{namespace}}}:{}",
|
"{}_{slug_namespace}:{}",
|
||||||
self.meta_namespace, actual_slug
|
self.meta_namespace, actual_slug
|
||||||
),
|
),
|
||||||
key.to_string(),
|
key.to_string(),
|
||||||
DEFAULT_EXPIRY as u64,
|
DEFAULT_EXPIRY as u64,
|
||||||
);
|
);
|
||||||
|
|
||||||
pipe.del(format!(
|
pipe.atomic().del(format!(
|
||||||
"{}_{namespace}:{{ns:{namespace}}}:{}/lock",
|
"{}_{namespace}:{}/lock",
|
||||||
// Locks are stored in lowercase
|
// Locks are stored in lowercase
|
||||||
self.meta_namespace,
|
self.meta_namespace,
|
||||||
actual_slug.to_lowercase()
|
actual_slug.to_lowercase()
|
||||||
@@ -499,16 +489,16 @@ impl RedisPool {
|
|||||||
let base62 = to_base62(value);
|
let base62 = to_base62(value);
|
||||||
ids.remove(&base62);
|
ids.remove(&base62);
|
||||||
|
|
||||||
pipe.del(format!(
|
pipe.atomic().del(format!(
|
||||||
"{}_{namespace}:{{ns:{namespace}}}:{}/lock",
|
"{}_{namespace}:{}/lock",
|
||||||
self.meta_namespace,
|
self.meta_namespace,
|
||||||
// Locks are stored in lowercase
|
// Locks are stored in lowercase
|
||||||
base62.to_lowercase()
|
base62.to_lowercase()
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
pipe.del(format!(
|
pipe.atomic().del(format!(
|
||||||
"{}_{namespace}:{{ns:{namespace}}}:{key}/lock",
|
"{}_{namespace}:{key}/lock",
|
||||||
self.meta_namespace
|
self.meta_namespace
|
||||||
));
|
));
|
||||||
|
|
||||||
@@ -517,13 +507,13 @@ impl RedisPool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (key, _) in ids {
|
for (key, _) in ids {
|
||||||
pipe.del(format!(
|
pipe.atomic().del(format!(
|
||||||
"{}_{namespace}:{{ns:{namespace}}}:{}/lock",
|
"{}_{namespace}:{}/lock",
|
||||||
self.meta_namespace,
|
self.meta_namespace,
|
||||||
key.to_lowercase()
|
key.to_lowercase()
|
||||||
));
|
));
|
||||||
pipe.del(format!(
|
pipe.atomic().del(format!(
|
||||||
"{}_{namespace}:{{ns:{namespace}}}:{key}/lock",
|
"{}_{namespace}:{key}/lock",
|
||||||
self.meta_namespace
|
self.meta_namespace
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
@@ -549,7 +539,7 @@ impl RedisPool {
|
|||||||
.iter()
|
.iter()
|
||||||
.map(|x| {
|
.map(|x| {
|
||||||
format!(
|
format!(
|
||||||
"{}_{namespace}:{{ns:{namespace}}}:{}/lock",
|
"{}_{namespace}:{}/lock",
|
||||||
self.meta_namespace,
|
self.meta_namespace,
|
||||||
// We lowercase key because locks are stored in lowercase
|
// We lowercase key because locks are stored in lowercase
|
||||||
x.key().to_lowercase()
|
x.key().to_lowercase()
|
||||||
@@ -623,10 +613,7 @@ impl RedisConnection {
|
|||||||
redis_args(
|
redis_args(
|
||||||
&mut cmd,
|
&mut cmd,
|
||||||
vec![
|
vec![
|
||||||
format!(
|
format!("{}_{}:{}", self.meta_namespace, namespace, id),
|
||||||
"{}_{}:{{ns:{namespace}}}:{}",
|
|
||||||
self.meta_namespace, namespace, id
|
|
||||||
),
|
|
||||||
data.to_string(),
|
data.to_string(),
|
||||||
"EX".to_string(),
|
"EX".to_string(),
|
||||||
expiry.unwrap_or(DEFAULT_EXPIRY).to_string(),
|
expiry.unwrap_or(DEFAULT_EXPIRY).to_string(),
|
||||||
@@ -667,11 +654,8 @@ impl RedisConnection {
|
|||||||
let mut cmd = cmd("GET");
|
let mut cmd = cmd("GET");
|
||||||
redis_args(
|
redis_args(
|
||||||
&mut cmd,
|
&mut cmd,
|
||||||
vec![format!(
|
vec![format!("{}_{}:{}", self.meta_namespace, namespace, id)]
|
||||||
"{}_{}:{{ns:{namespace}}}:{}",
|
.as_slice(),
|
||||||
self.meta_namespace, namespace, id
|
|
||||||
)]
|
|
||||||
.as_slice(),
|
|
||||||
);
|
);
|
||||||
let res = redis_execute(&mut cmd, &mut self.connection).await?;
|
let res = redis_execute(&mut cmd, &mut self.connection).await?;
|
||||||
Ok(res)
|
Ok(res)
|
||||||
@@ -687,12 +671,7 @@ impl RedisConnection {
|
|||||||
redis_args(
|
redis_args(
|
||||||
&mut cmd,
|
&mut cmd,
|
||||||
ids.iter()
|
ids.iter()
|
||||||
.map(|x| {
|
.map(|x| format!("{}_{}:{}", self.meta_namespace, namespace, x))
|
||||||
format!(
|
|
||||||
"{}_{}:{{ns:{namespace}}}:{}",
|
|
||||||
self.meta_namespace, namespace, x
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.as_slice(),
|
.as_slice(),
|
||||||
);
|
);
|
||||||
@@ -744,11 +723,8 @@ impl RedisConnection {
|
|||||||
let mut cmd = cmd("DEL");
|
let mut cmd = cmd("DEL");
|
||||||
redis_args(
|
redis_args(
|
||||||
&mut cmd,
|
&mut cmd,
|
||||||
vec![format!(
|
vec![format!("{}_{}:{}", self.meta_namespace, namespace, id)]
|
||||||
"{}_{}:{{ns:{namespace}}}:{}",
|
.as_slice(),
|
||||||
self.meta_namespace, namespace, id
|
|
||||||
)]
|
|
||||||
.as_slice(),
|
|
||||||
);
|
);
|
||||||
redis_execute::<()>(&mut cmd, &mut self.connection).await?;
|
redis_execute::<()>(&mut cmd, &mut self.connection).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -757,20 +733,16 @@ impl RedisConnection {
|
|||||||
#[tracing::instrument(skip(self, iter))]
|
#[tracing::instrument(skip(self, iter))]
|
||||||
pub async fn delete_many(
|
pub async fn delete_many(
|
||||||
&mut self,
|
&mut self,
|
||||||
namespace: &str,
|
iter: impl IntoIterator<Item = (&str, Option<String>)>,
|
||||||
iter: impl IntoIterator<Item = Option<String>>,
|
|
||||||
) -> Result<(), DatabaseError> {
|
) -> Result<(), DatabaseError> {
|
||||||
let mut cmd = cmd("DEL");
|
let mut cmd = cmd("DEL");
|
||||||
let mut any = false;
|
let mut any = false;
|
||||||
for id in iter {
|
for (namespace, id) in iter {
|
||||||
if let Some(id) = id {
|
if let Some(id) = id {
|
||||||
redis_args(
|
redis_args(
|
||||||
&mut cmd,
|
&mut cmd,
|
||||||
[format!(
|
[format!("{}_{}:{}", self.meta_namespace, namespace, id)]
|
||||||
"{}_{}:{{ns:{namespace}}}:{}",
|
.as_slice(),
|
||||||
self.meta_namespace, namespace, id
|
|
||||||
)]
|
|
||||||
.as_slice(),
|
|
||||||
);
|
);
|
||||||
any = true;
|
any = true;
|
||||||
}
|
}
|
||||||
@@ -790,10 +762,7 @@ impl RedisConnection {
|
|||||||
key: &str,
|
key: &str,
|
||||||
value: impl ToRedisArgs + Send + Sync + Debug,
|
value: impl ToRedisArgs + Send + Sync + Debug,
|
||||||
) -> Result<(), DatabaseError> {
|
) -> Result<(), DatabaseError> {
|
||||||
let key = format!(
|
let key = format!("{}_{namespace}:{key}", self.meta_namespace);
|
||||||
"{}_{namespace}:{{ns:{namespace}}}:{key}",
|
|
||||||
self.meta_namespace
|
|
||||||
);
|
|
||||||
cmd("LPUSH")
|
cmd("LPUSH")
|
||||||
.arg(key)
|
.arg(key)
|
||||||
.arg(value)
|
.arg(value)
|
||||||
@@ -809,10 +778,7 @@ impl RedisConnection {
|
|||||||
key: &str,
|
key: &str,
|
||||||
timeout: Option<f64>,
|
timeout: Option<f64>,
|
||||||
) -> Result<Option<[String; 2]>, DatabaseError> {
|
) -> Result<Option<[String; 2]>, DatabaseError> {
|
||||||
let key = format!(
|
let key = format!("{}_{namespace}:{key}", self.meta_namespace);
|
||||||
"{}_{namespace}:{{ns:{namespace}}}:{key}",
|
|
||||||
self.meta_namespace
|
|
||||||
);
|
|
||||||
// a timeout of 0 is infinite
|
// a timeout of 0 is infinite
|
||||||
let timeout = timeout.unwrap_or(0.0);
|
let timeout = timeout.unwrap_or(0.0);
|
||||||
let values = cmd("BRPOP")
|
let values = cmd("BRPOP")
|
||||||
@@ -841,7 +807,7 @@ pub fn redis_args(cmd: &mut util::InstrumentedCmd, args: &[String]) {
|
|||||||
|
|
||||||
pub async fn redis_execute<T>(
|
pub async fn redis_execute<T>(
|
||||||
cmd: &mut util::InstrumentedCmd,
|
cmd: &mut util::InstrumentedCmd,
|
||||||
redis: &mut deadpool_redis::cluster::Connection,
|
redis: &mut deadpool_redis::Connection,
|
||||||
) -> Result<T, deadpool_redis::PoolError>
|
) -> Result<T, deadpool_redis::PoolError>
|
||||||
where
|
where
|
||||||
T: redis::FromRedisValue,
|
T: redis::FromRedisValue,
|
||||||
|
|||||||
@@ -5,19 +5,17 @@ use derive_more::{Deref, DerefMut};
|
|||||||
use redis::{FromRedisValue, RedisResult, ToRedisArgs};
|
use redis::{FromRedisValue, RedisResult, ToRedisArgs};
|
||||||
use tracing::{Instrument, info_span};
|
use tracing::{Instrument, info_span};
|
||||||
|
|
||||||
#[derive(/*Debug, */ Clone, Deref, DerefMut)]
|
#[derive(Debug, Clone, Deref, DerefMut)]
|
||||||
pub struct InstrumentedPool {
|
pub struct InstrumentedPool {
|
||||||
inner: deadpool_redis::cluster::Pool,
|
inner: deadpool_redis::Pool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl InstrumentedPool {
|
impl InstrumentedPool {
|
||||||
pub fn new(inner: deadpool_redis::cluster::Pool) -> Self {
|
pub fn new(inner: deadpool_redis::Pool) -> Self {
|
||||||
Self { inner }
|
Self { inner }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get(
|
pub async fn get(&self) -> Result<deadpool_redis::Connection, PoolError> {
|
||||||
&self,
|
|
||||||
) -> Result<deadpool_redis::cluster::Connection, PoolError> {
|
|
||||||
self.inner
|
self.inner
|
||||||
.get()
|
.get()
|
||||||
.instrument(info_span!("get redis connection"))
|
.instrument(info_span!("get redis connection"))
|
||||||
|
|||||||
@@ -260,8 +260,7 @@ pub fn app_setup(
|
|||||||
|
|
||||||
{
|
{
|
||||||
let pool = pool.clone();
|
let pool = pool.clone();
|
||||||
let redis_client =
|
let redis_client = redis::Client::open(redis_pool.url.clone()).unwrap();
|
||||||
redis::Client::open(redis_pool.urls[0].clone()).unwrap();
|
|
||||||
let sockets = active_sockets.clone();
|
let sockets = active_sockets.clone();
|
||||||
actix_rt::spawn(async move {
|
actix_rt::spawn(async move {
|
||||||
let pubsub = redis_client.get_async_pubsub().await.unwrap();
|
let pubsub = redis_client.get_async_pubsub().await.unwrap();
|
||||||
@@ -387,7 +386,7 @@ pub fn check_env_vars() -> bool {
|
|||||||
failed |= check_var::<String>("MEILISEARCH_READ_ADDR");
|
failed |= check_var::<String>("MEILISEARCH_READ_ADDR");
|
||||||
failed |= check_var::<String>("MEILISEARCH_WRITE_ADDRS");
|
failed |= check_var::<String>("MEILISEARCH_WRITE_ADDRS");
|
||||||
failed |= check_var::<String>("MEILISEARCH_KEY");
|
failed |= check_var::<String>("MEILISEARCH_KEY");
|
||||||
failed |= check_var::<String>("REDIS_URLS");
|
failed |= check_var::<String>("REDIS_URL");
|
||||||
failed |= check_var::<String>("BIND_ADDR");
|
failed |= check_var::<String>("BIND_ADDR");
|
||||||
failed |= check_var::<String>("SELF_ADDR");
|
failed |= check_var::<String>("SELF_ADDR");
|
||||||
|
|
||||||
|
|||||||
@@ -120,12 +120,7 @@ impl AnalyticsQueue {
|
|||||||
.arg(
|
.arg(
|
||||||
views_keys
|
views_keys
|
||||||
.iter()
|
.iter()
|
||||||
.map(|x| {
|
.map(|x| format!("{}:{}-{}", VIEWS_NAMESPACE, x.0, x.1))
|
||||||
format!(
|
|
||||||
"{}:{{ns:{VIEWS_NAMESPACE}}}:{}-{}",
|
|
||||||
VIEWS_NAMESPACE, x.0, x.1
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>(),
|
.collect::<Vec<_>>(),
|
||||||
)
|
)
|
||||||
.query_async::<Vec<Option<u32>>>(&mut redis)
|
.query_async::<Vec<Option<u32>>>(&mut redis)
|
||||||
@@ -157,10 +152,7 @@ impl AnalyticsQueue {
|
|||||||
};
|
};
|
||||||
|
|
||||||
pipe.atomic().set_ex(
|
pipe.atomic().set_ex(
|
||||||
format!(
|
format!("{}:{}-{}", VIEWS_NAMESPACE, key.0, key.1),
|
||||||
"{}:{{ns:{VIEWS_NAMESPACE}}}:{}-{}",
|
|
||||||
VIEWS_NAMESPACE, key.0, key.1
|
|
||||||
),
|
|
||||||
new_count,
|
new_count,
|
||||||
6 * 60 * 60,
|
6 * 60 * 60,
|
||||||
);
|
);
|
||||||
@@ -203,10 +195,7 @@ impl AnalyticsQueue {
|
|||||||
downloads_keys
|
downloads_keys
|
||||||
.iter()
|
.iter()
|
||||||
.map(|x| {
|
.map(|x| {
|
||||||
format!(
|
format!("{}:{}-{}", DOWNLOADS_NAMESPACE, x.0, x.1)
|
||||||
"{}:{{ns:{VIEWS_NAMESPACE}}}:{}-{}",
|
|
||||||
DOWNLOADS_NAMESPACE, x.0, x.1
|
|
||||||
)
|
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>(),
|
.collect::<Vec<_>>(),
|
||||||
)
|
)
|
||||||
@@ -230,10 +219,7 @@ impl AnalyticsQueue {
|
|||||||
};
|
};
|
||||||
|
|
||||||
pipe.atomic().set_ex(
|
pipe.atomic().set_ex(
|
||||||
format!(
|
format!("{}:{}-{}", DOWNLOADS_NAMESPACE, key.0, key.1),
|
||||||
"{}:{{ns:{VIEWS_NAMESPACE}}}:{}-{}",
|
|
||||||
DOWNLOADS_NAMESPACE, key.0, key.1
|
|
||||||
),
|
|
||||||
new_count,
|
new_count,
|
||||||
6 * 60 * 60,
|
6 * 60 * 60,
|
||||||
);
|
);
|
||||||
|
|||||||
Reference in New Issue
Block a user