You've already forked AstralRinth
forked from didirus/AstralRinth
Fix clippy errors + lint, use turbo CI
This commit is contained in:
@@ -32,18 +32,20 @@ impl RedisPool {
|
||||
// testing pool uses a hashmap to mimic redis behaviour for very small data sizes (ie: tests)
|
||||
// PANICS: production pool will panic if redis url is not set
|
||||
pub fn new(meta_namespace: Option<String>) -> Self {
|
||||
let redis_pool = Config::from_url(dotenvy::var("REDIS_URL").expect("Redis URL not set"))
|
||||
.builder()
|
||||
.expect("Error building Redis pool")
|
||||
.max_size(
|
||||
dotenvy::var("DATABASE_MAX_CONNECTIONS")
|
||||
.ok()
|
||||
.and_then(|x| x.parse().ok())
|
||||
.unwrap_or(10000),
|
||||
)
|
||||
.runtime(Runtime::Tokio1)
|
||||
.build()
|
||||
.expect("Redis connection failed");
|
||||
let redis_pool = Config::from_url(
|
||||
dotenvy::var("REDIS_URL").expect("Redis URL not set"),
|
||||
)
|
||||
.builder()
|
||||
.expect("Error building Redis pool")
|
||||
.max_size(
|
||||
dotenvy::var("DATABASE_MAX_CONNECTIONS")
|
||||
.ok()
|
||||
.and_then(|x| x.parse().ok())
|
||||
.unwrap_or(10000),
|
||||
)
|
||||
.runtime(Runtime::Tokio1)
|
||||
.build()
|
||||
.expect("Redis connection failed");
|
||||
|
||||
RedisPool {
|
||||
pool: redis_pool,
|
||||
@@ -68,7 +70,14 @@ impl RedisPool {
|
||||
F: FnOnce(Vec<K>) -> Fut,
|
||||
Fut: Future<Output = Result<DashMap<K, T>, DatabaseError>>,
|
||||
T: Serialize + DeserializeOwned,
|
||||
K: Display + Hash + Eq + PartialEq + Clone + DeserializeOwned + Serialize + Debug,
|
||||
K: Display
|
||||
+ Hash
|
||||
+ Eq
|
||||
+ PartialEq
|
||||
+ Clone
|
||||
+ DeserializeOwned
|
||||
+ Serialize
|
||||
+ Debug,
|
||||
{
|
||||
Ok(self
|
||||
.get_cached_keys_raw(namespace, keys, closure)
|
||||
@@ -88,15 +97,28 @@ impl RedisPool {
|
||||
F: FnOnce(Vec<K>) -> Fut,
|
||||
Fut: Future<Output = Result<DashMap<K, T>, DatabaseError>>,
|
||||
T: Serialize + DeserializeOwned,
|
||||
K: Display + Hash + Eq + PartialEq + Clone + DeserializeOwned + Serialize + Debug,
|
||||
K: Display
|
||||
+ Hash
|
||||
+ Eq
|
||||
+ PartialEq
|
||||
+ Clone
|
||||
+ DeserializeOwned
|
||||
+ Serialize
|
||||
+ Debug,
|
||||
{
|
||||
self.get_cached_keys_raw_with_slug(namespace, None, false, keys, |ids| async move {
|
||||
Ok(closure(ids)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|(key, val)| (key, (None::<String>, val)))
|
||||
.collect())
|
||||
})
|
||||
self.get_cached_keys_raw_with_slug(
|
||||
namespace,
|
||||
None,
|
||||
false,
|
||||
keys,
|
||||
|ids| async move {
|
||||
Ok(closure(ids)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|(key, val)| (key, (None::<String>, val)))
|
||||
.collect())
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -113,7 +135,13 @@ impl RedisPool {
|
||||
Fut: Future<Output = Result<DashMap<K, (Option<S>, T)>, DatabaseError>>,
|
||||
T: Serialize + DeserializeOwned,
|
||||
I: Display + Hash + Eq + PartialEq + Clone + Debug,
|
||||
K: Display + Hash + Eq + PartialEq + Clone + DeserializeOwned + Serialize,
|
||||
K: Display
|
||||
+ Hash
|
||||
+ Eq
|
||||
+ PartialEq
|
||||
+ Clone
|
||||
+ DeserializeOwned
|
||||
+ Serialize,
|
||||
S: Display + Clone + DeserializeOwned + Serialize + Debug,
|
||||
{
|
||||
Ok(self
|
||||
@@ -143,7 +171,13 @@ impl RedisPool {
|
||||
Fut: Future<Output = Result<DashMap<K, (Option<S>, T)>, DatabaseError>>,
|
||||
T: Serialize + DeserializeOwned,
|
||||
I: Display + Hash + Eq + PartialEq + Clone + Debug,
|
||||
K: Display + Hash + Eq + PartialEq + Clone + DeserializeOwned + Serialize,
|
||||
K: Display
|
||||
+ Hash
|
||||
+ Eq
|
||||
+ PartialEq
|
||||
+ Clone
|
||||
+ DeserializeOwned
|
||||
+ Serialize,
|
||||
S: Display + Clone + DeserializeOwned + Serialize + Debug,
|
||||
{
|
||||
let connection = self.connect().await?.connection;
|
||||
@@ -158,7 +192,8 @@ impl RedisPool {
|
||||
}
|
||||
|
||||
let get_cached_values =
|
||||
|ids: DashMap<String, I>, mut connection: deadpool_redis::Connection| async move {
|
||||
|ids: DashMap<String, I>,
|
||||
mut connection: deadpool_redis::Connection| async move {
|
||||
let slug_ids = if let Some(slug_namespace) = slug_namespace {
|
||||
cmd("MGET")
|
||||
.arg(
|
||||
@@ -176,7 +211,7 @@ impl RedisPool {
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
.query_async::<_, Vec<Option<String>>>(&mut connection)
|
||||
.query_async::<Vec<Option<String>>>(&mut connection)
|
||||
.await?
|
||||
.into_iter()
|
||||
.flatten()
|
||||
@@ -195,15 +230,23 @@ impl RedisPool {
|
||||
.map(|x| x.to_string())
|
||||
}))
|
||||
.chain(slug_ids)
|
||||
.map(|x| format!("{}_{namespace}:{x}", self.meta_namespace))
|
||||
.map(|x| {
|
||||
format!(
|
||||
"{}_{namespace}:{x}",
|
||||
self.meta_namespace
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
.query_async::<_, Vec<Option<String>>>(&mut connection)
|
||||
.query_async::<Vec<Option<String>>>(&mut connection)
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter_map(|x| {
|
||||
x.and_then(|val| serde_json::from_str::<RedisValue<T, K, S>>(&val).ok())
|
||||
.map(|val| (val.key.clone(), val))
|
||||
x.and_then(|val| {
|
||||
serde_json::from_str::<RedisValue<T, K, S>>(&val)
|
||||
.ok()
|
||||
})
|
||||
.map(|val| (val.key.clone(), val))
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
@@ -213,11 +256,14 @@ impl RedisPool {
|
||||
let current_time = Utc::now();
|
||||
let mut expired_values = HashMap::new();
|
||||
|
||||
let (cached_values_raw, mut connection, ids) = get_cached_values(ids, connection).await?;
|
||||
let (cached_values_raw, mut connection, ids) =
|
||||
get_cached_values(ids, connection).await?;
|
||||
let mut cached_values = cached_values_raw
|
||||
.into_iter()
|
||||
.filter_map(|(key, val)| {
|
||||
if Utc.timestamp_opt(val.iat + ACTUAL_EXPIRY, 0).unwrap() < current_time {
|
||||
if Utc.timestamp_opt(val.iat + ACTUAL_EXPIRY, 0).unwrap()
|
||||
< current_time
|
||||
{
|
||||
expired_values.insert(val.key.to_string(), val);
|
||||
|
||||
None
|
||||
@@ -244,7 +290,8 @@ impl RedisPool {
|
||||
if !ids.is_empty() {
|
||||
let mut pipe = redis::pipe();
|
||||
|
||||
let fetch_ids = ids.iter().map(|x| x.key().clone()).collect::<Vec<_>>();
|
||||
let fetch_ids =
|
||||
ids.iter().map(|x| x.key().clone()).collect::<Vec<_>>();
|
||||
|
||||
fetch_ids.iter().for_each(|key| {
|
||||
pipe.atomic().set_options(
|
||||
@@ -257,7 +304,7 @@ impl RedisPool {
|
||||
);
|
||||
});
|
||||
let results = pipe
|
||||
.query_async::<_, Vec<Option<i32>>>(&mut connection)
|
||||
.query_async::<Vec<Option<i32>>>(&mut connection)
|
||||
.await?;
|
||||
|
||||
for (idx, key) in fetch_ids.into_iter().enumerate() {
|
||||
@@ -288,12 +335,22 @@ impl RedisPool {
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
let mut fetch_tasks: Vec<
|
||||
Pin<Box<dyn Future<Output = Result<HashMap<K, RedisValue<T, K, S>>, DatabaseError>>>>,
|
||||
Pin<
|
||||
Box<
|
||||
dyn Future<
|
||||
Output = Result<
|
||||
HashMap<K, RedisValue<T, K, S>>,
|
||||
DatabaseError,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
> = Vec::new();
|
||||
|
||||
if !ids.is_empty() {
|
||||
fetch_tasks.push(Box::pin(async {
|
||||
let fetch_ids = ids.iter().map(|x| x.value().clone()).collect::<Vec<_>>();
|
||||
let fetch_ids =
|
||||
ids.iter().map(|x| x.value().clone()).collect::<Vec<_>>();
|
||||
|
||||
let vals = closure(fetch_ids).await?;
|
||||
let mut return_values = HashMap::new();
|
||||
@@ -309,7 +366,10 @@ impl RedisPool {
|
||||
};
|
||||
|
||||
pipe.atomic().set_ex(
|
||||
format!("{}_{namespace}:{key}", self.meta_namespace),
|
||||
format!(
|
||||
"{}_{namespace}:{key}",
|
||||
self.meta_namespace
|
||||
),
|
||||
serde_json::to_string(&value)?,
|
||||
DEFAULT_EXPIRY as u64,
|
||||
);
|
||||
@@ -347,23 +407,29 @@ impl RedisPool {
|
||||
let base62 = to_base62(value);
|
||||
ids.remove(&base62);
|
||||
|
||||
pipe.atomic()
|
||||
.del(format!("{}_{namespace}:{base62}/lock", self.meta_namespace));
|
||||
pipe.atomic().del(format!(
|
||||
"{}_{namespace}:{base62}/lock",
|
||||
self.meta_namespace
|
||||
));
|
||||
}
|
||||
|
||||
pipe.atomic()
|
||||
.del(format!("{}_{namespace}:{key}/lock", self.meta_namespace));
|
||||
pipe.atomic().del(format!(
|
||||
"{}_{namespace}:{key}/lock",
|
||||
self.meta_namespace
|
||||
));
|
||||
|
||||
return_values.insert(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
for (key, _) in ids {
|
||||
pipe.atomic()
|
||||
.del(format!("{}_{namespace}:{key}/lock", self.meta_namespace));
|
||||
pipe.atomic().del(format!(
|
||||
"{}_{namespace}:{key}/lock",
|
||||
self.meta_namespace
|
||||
));
|
||||
}
|
||||
|
||||
pipe.query_async(&mut connection).await?;
|
||||
pipe.query_async::<()>(&mut connection).await?;
|
||||
|
||||
Ok(return_values)
|
||||
}));
|
||||
@@ -373,7 +439,8 @@ impl RedisPool {
|
||||
fetch_tasks.push(Box::pin(async {
|
||||
let mut connection = self.pool.get().await?;
|
||||
|
||||
let mut interval = tokio::time::interval(Duration::from_millis(100));
|
||||
let mut interval =
|
||||
tokio::time::interval(Duration::from_millis(100));
|
||||
let start = Utc::now();
|
||||
loop {
|
||||
let results = cmd("MGET")
|
||||
@@ -381,11 +448,15 @@ impl RedisPool {
|
||||
subscribe_ids
|
||||
.iter()
|
||||
.map(|x| {
|
||||
format!("{}_{namespace}:{}/lock", self.meta_namespace, x.key())
|
||||
format!(
|
||||
"{}_{namespace}:{}/lock",
|
||||
self.meta_namespace,
|
||||
x.key()
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
.query_async::<_, Vec<Option<String>>>(&mut connection)
|
||||
.query_async::<Vec<Option<String>>>(&mut connection)
|
||||
.await?;
|
||||
|
||||
if results.into_iter().all(|x| x.is_none()) {
|
||||
@@ -399,7 +470,8 @@ impl RedisPool {
|
||||
interval.tick().await;
|
||||
}
|
||||
|
||||
let (return_values, _, _) = get_cached_values(subscribe_ids, connection).await?;
|
||||
let (return_values, _, _) =
|
||||
get_cached_values(subscribe_ids, connection).await?;
|
||||
|
||||
Ok(return_values)
|
||||
}));
|
||||
@@ -436,7 +508,7 @@ impl RedisConnection {
|
||||
]
|
||||
.as_slice(),
|
||||
);
|
||||
redis_execute(&mut cmd, &mut self.connection).await?;
|
||||
redis_execute::<()>(&mut cmd, &mut self.connection).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -468,7 +540,8 @@ impl RedisConnection {
|
||||
let mut cmd = cmd("GET");
|
||||
redis_args(
|
||||
&mut cmd,
|
||||
vec![format!("{}_{}:{}", self.meta_namespace, namespace, id)].as_slice(),
|
||||
vec![format!("{}_{}:{}", self.meta_namespace, namespace, id)]
|
||||
.as_slice(),
|
||||
);
|
||||
let res = redis_execute(&mut cmd, &mut self.connection).await?;
|
||||
Ok(res)
|
||||
@@ -488,16 +561,21 @@ impl RedisConnection {
|
||||
.and_then(|x| serde_json::from_str(&x).ok()))
|
||||
}
|
||||
|
||||
pub async fn delete<T1>(&mut self, namespace: &str, id: T1) -> Result<(), DatabaseError>
|
||||
pub async fn delete<T1>(
|
||||
&mut self,
|
||||
namespace: &str,
|
||||
id: T1,
|
||||
) -> Result<(), DatabaseError>
|
||||
where
|
||||
T1: Display,
|
||||
{
|
||||
let mut cmd = cmd("DEL");
|
||||
redis_args(
|
||||
&mut cmd,
|
||||
vec![format!("{}_{}:{}", self.meta_namespace, namespace, id)].as_slice(),
|
||||
vec![format!("{}_{}:{}", self.meta_namespace, namespace, id)]
|
||||
.as_slice(),
|
||||
);
|
||||
redis_execute(&mut cmd, &mut self.connection).await?;
|
||||
redis_execute::<()>(&mut cmd, &mut self.connection).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -511,14 +589,15 @@ impl RedisConnection {
|
||||
if let Some(id) = id {
|
||||
redis_args(
|
||||
&mut cmd,
|
||||
[format!("{}_{}:{}", self.meta_namespace, namespace, id)].as_slice(),
|
||||
[format!("{}_{}:{}", self.meta_namespace, namespace, id)]
|
||||
.as_slice(),
|
||||
);
|
||||
any = true;
|
||||
}
|
||||
}
|
||||
|
||||
if any {
|
||||
redis_execute(&mut cmd, &mut self.connection).await?;
|
||||
redis_execute::<()>(&mut cmd, &mut self.connection).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -547,6 +626,6 @@ pub async fn redis_execute<T>(
|
||||
where
|
||||
T: redis::FromRedisValue,
|
||||
{
|
||||
let res = cmd.query_async::<_, T>(redis).await?;
|
||||
let res = cmd.query_async::<T>(redis).await?;
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user