PG/RedisPool configuration/o11y improvements (#5032)

* Don't retain Redis connections while doing database queries

* Optional REDIS_WAIT_TIMEOUT_MS

* Attach more data to CacheTimeout errrors

* Fix locks_released

* Fmt

* Set default REDIS_WAIT_TIMEOUT_MS to 15s

* Fix lint

* Close Redis connections idle for > 5 minutes

* Exponential backoff on cache spin lock
This commit is contained in:
François-Xavier Talbot
2026-01-08 14:47:13 -05:00
committed by GitHub
parent ea17534f77
commit ff222aa168
13 changed files with 269 additions and 130 deletions

View File

@@ -92,6 +92,7 @@ impl Category {
) -> Result<Vec<Category>, DatabaseError> ) -> Result<Vec<Category>, DatabaseError>
where where
E: sqlx::Executor<'a, Database = sqlx::Postgres>, E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
{ {
let mut redis = redis.connect().await?; let mut redis = redis.connect().await?;
@@ -102,6 +103,7 @@ impl Category {
if let Some(res) = res { if let Some(res) = res {
return Ok(res); return Ok(res);
} }
}
let result = sqlx::query!( let result = sqlx::query!(
" "
@@ -122,6 +124,8 @@ impl Category {
.try_collect::<Vec<Category>>() .try_collect::<Vec<Category>>()
.await?; .await?;
let mut redis = redis.connect().await?;
redis redis
.set_serialized_to_json(TAGS_NAMESPACE, "category", &result, None) .set_serialized_to_json(TAGS_NAMESPACE, "category", &result, None)
.await?; .await?;
@@ -157,6 +161,7 @@ impl LinkPlatform {
) -> Result<Vec<LinkPlatform>, DatabaseError> ) -> Result<Vec<LinkPlatform>, DatabaseError>
where where
E: sqlx::Executor<'a, Database = sqlx::Postgres>, E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
{ {
let mut redis = redis.connect().await?; let mut redis = redis.connect().await?;
@@ -167,6 +172,7 @@ impl LinkPlatform {
if let Some(res) = res { if let Some(res) = res {
return Ok(res); return Ok(res);
} }
}
let result = sqlx::query!( let result = sqlx::query!(
" "
@@ -182,6 +188,8 @@ impl LinkPlatform {
.try_collect::<Vec<LinkPlatform>>() .try_collect::<Vec<LinkPlatform>>()
.await?; .await?;
let mut redis = redis.connect().await?;
redis redis
.set_serialized_to_json( .set_serialized_to_json(
TAGS_NAMESPACE, TAGS_NAMESPACE,
@@ -222,6 +230,7 @@ impl ReportType {
) -> Result<Vec<String>, DatabaseError> ) -> Result<Vec<String>, DatabaseError>
where where
E: sqlx::Executor<'a, Database = sqlx::Postgres>, E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
{ {
let mut redis = redis.connect().await?; let mut redis = redis.connect().await?;
@@ -232,6 +241,7 @@ impl ReportType {
if let Some(res) = res { if let Some(res) = res {
return Ok(res); return Ok(res);
} }
}
let result = sqlx::query!( let result = sqlx::query!(
" "
@@ -243,6 +253,8 @@ impl ReportType {
.try_collect::<Vec<String>>() .try_collect::<Vec<String>>()
.await?; .await?;
let mut redis = redis.connect().await?;
redis redis
.set_serialized_to_json( .set_serialized_to_json(
TAGS_NAMESPACE, TAGS_NAMESPACE,
@@ -283,6 +295,7 @@ impl ProjectType {
) -> Result<Vec<String>, DatabaseError> ) -> Result<Vec<String>, DatabaseError>
where where
E: sqlx::Executor<'a, Database = sqlx::Postgres>, E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
{ {
let mut redis = redis.connect().await?; let mut redis = redis.connect().await?;
@@ -293,6 +306,7 @@ impl ProjectType {
if let Some(res) = res { if let Some(res) = res {
return Ok(res); return Ok(res);
} }
}
let result = sqlx::query!( let result = sqlx::query!(
" "
@@ -304,6 +318,8 @@ impl ProjectType {
.try_collect::<Vec<String>>() .try_collect::<Vec<String>>()
.await?; .await?;
let mut redis = redis.connect().await?;
redis redis
.set_serialized_to_json( .set_serialized_to_json(
TAGS_NAMESPACE, TAGS_NAMESPACE,

View File

@@ -49,6 +49,7 @@ impl Game {
) -> Result<Vec<Game>, DatabaseError> ) -> Result<Vec<Game>, DatabaseError>
where where
E: sqlx::Executor<'a, Database = sqlx::Postgres>, E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
{ {
let mut redis = redis.connect().await?; let mut redis = redis.connect().await?;
let cached_games: Option<Vec<Game>> = redis let cached_games: Option<Vec<Game>> = redis
@@ -57,6 +58,7 @@ impl Game {
if let Some(cached_games) = cached_games { if let Some(cached_games) = cached_games {
return Ok(cached_games); return Ok(cached_games);
} }
}
let result = sqlx::query!( let result = sqlx::query!(
" "
@@ -74,6 +76,8 @@ impl Game {
.try_collect::<Vec<Game>>() .try_collect::<Vec<Game>>()
.await?; .await?;
let mut redis = redis.connect().await?;
redis redis
.set_serialized_to_json( .set_serialized_to_json(
GAMES_LIST_NAMESPACE, GAMES_LIST_NAMESPACE,
@@ -105,6 +109,7 @@ impl Loader {
) -> Result<Option<LoaderId>, DatabaseError> ) -> Result<Option<LoaderId>, DatabaseError>
where where
E: sqlx::Executor<'a, Database = sqlx::Postgres>, E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
{ {
let mut redis = redis.connect().await?; let mut redis = redis.connect().await?;
let cached_id: Option<i32> = let cached_id: Option<i32> =
@@ -112,6 +117,7 @@ impl Loader {
if let Some(cached_id) = cached_id { if let Some(cached_id) = cached_id {
return Ok(Some(LoaderId(cached_id))); return Ok(Some(LoaderId(cached_id)));
} }
}
let result = sqlx::query!( let result = sqlx::query!(
" "
@@ -125,6 +131,7 @@ impl Loader {
.map(|r| LoaderId(r.id)); .map(|r| LoaderId(r.id));
if let Some(result) = result { if let Some(result) = result {
let mut redis = redis.connect().await?;
redis redis
.set_serialized_to_json(LOADER_ID, name, &result.0, None) .set_serialized_to_json(LOADER_ID, name, &result.0, None)
.await?; .await?;
@@ -139,6 +146,7 @@ impl Loader {
) -> Result<Vec<Loader>, DatabaseError> ) -> Result<Vec<Loader>, DatabaseError>
where where
E: sqlx::Executor<'a, Database = sqlx::Postgres>, E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
{ {
let mut redis = redis.connect().await?; let mut redis = redis.connect().await?;
let cached_loaders: Option<Vec<Loader>> = redis let cached_loaders: Option<Vec<Loader>> = redis
@@ -147,6 +155,7 @@ impl Loader {
if let Some(cached_loaders) = cached_loaders { if let Some(cached_loaders) = cached_loaders {
return Ok(cached_loaders); return Ok(cached_loaders);
} }
}
let result = sqlx::query!( let result = sqlx::query!(
" "
@@ -180,6 +189,8 @@ impl Loader {
.try_collect::<Vec<_>>() .try_collect::<Vec<_>>()
.await?; .await?;
let mut redis = redis.connect().await?;
redis redis
.set_serialized_to_json( .set_serialized_to_json(
LOADERS_LIST_NAMESPACE, LOADERS_LIST_NAMESPACE,
@@ -454,17 +465,19 @@ impl LoaderField {
) -> Result<Vec<LoaderField>, DatabaseError> ) -> Result<Vec<LoaderField>, DatabaseError>
where where
E: sqlx::Executor<'a, Database = sqlx::Postgres>, E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
{ {
let mut redis = redis.connect().await?; let mut redis = redis.connect().await?;
let cached_fields: Option<Vec<LoaderField>> = redis let cached_fields: Option<Vec<LoaderField>> =
.get(LOADER_FIELDS_NAMESPACE_ALL, "") redis.get(LOADER_FIELDS_NAMESPACE_ALL, "").await?.and_then(
.await? |x| serde_json::from_str::<Vec<LoaderField>>(&x).ok(),
.and_then(|x| serde_json::from_str::<Vec<LoaderField>>(&x).ok()); );
if let Some(cached_fields) = cached_fields { if let Some(cached_fields) = cached_fields {
return Ok(cached_fields); return Ok(cached_fields);
} }
}
let result = sqlx::query!( let result = sqlx::query!(
" "
@@ -489,6 +502,8 @@ impl LoaderField {
.flatten() .flatten()
.collect(); .collect();
let mut redis = redis.connect().await?;
redis redis
.set_serialized_to_json( .set_serialized_to_json(
LOADER_FIELDS_NAMESPACE_ALL, LOADER_FIELDS_NAMESPACE_ALL,
@@ -509,6 +524,7 @@ impl LoaderFieldEnum {
) -> Result<Option<LoaderFieldEnum>, DatabaseError> ) -> Result<Option<LoaderFieldEnum>, DatabaseError>
where where
E: sqlx::Executor<'a, Database = sqlx::Postgres>, E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
{ {
let mut redis = redis.connect().await?; let mut redis = redis.connect().await?;
@@ -521,6 +537,7 @@ impl LoaderFieldEnum {
if let Some(cached_enum) = cached_enum { if let Some(cached_enum) = cached_enum {
return Ok(cached_enum); return Ok(cached_enum);
} }
}
let result = sqlx::query!( let result = sqlx::query!(
" "
@@ -540,6 +557,8 @@ impl LoaderFieldEnum {
hidable: l.hidable, hidable: l.hidable,
}); });
let mut redis = redis.connect().await?;
redis redis
.set_serialized_to_json( .set_serialized_to_json(
LOADER_FIELD_ENUMS_ID_NAMESPACE, LOADER_FIELD_ENUMS_ID_NAMESPACE,

View File

@@ -67,6 +67,13 @@ pub enum DatabaseError {
SerdeCacheError(#[from] serde_json::Error), SerdeCacheError(#[from] serde_json::Error),
#[error("Schema error: {0}")] #[error("Schema error: {0}")]
SchemaError(String), SchemaError(String),
#[error("Timeout when waiting for cache subscriber")] #[error(
CacheTimeout, "Timeout waiting on Redis cache lock ({locks_released}/{locks_waiting} released, spent {time_spent_pool_wait_ms}ms/{time_spent_total_ms}ms waiting on connections from pool)"
)]
CacheTimeout {
locks_released: usize,
locks_waiting: usize,
time_spent_pool_wait_ms: u64,
time_spent_total_ms: u64,
},
} }

View File

@@ -379,6 +379,7 @@ impl DBNotification {
) -> Result<Vec<DBNotification>, DatabaseError> ) -> Result<Vec<DBNotification>, DatabaseError>
where where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy, E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
{
{ {
let mut redis = redis.connect().await?; let mut redis = redis.connect().await?;
@@ -392,6 +393,7 @@ impl DBNotification {
if let Some(notifications) = cached_notifications { if let Some(notifications) = cached_notifications {
return Ok(notifications); return Ok(notifications);
} }
}
let db_notifications = sqlx::query!( let db_notifications = sqlx::query!(
" "
@@ -437,6 +439,8 @@ impl DBNotification {
.try_collect::<Vec<DBNotification>>() .try_collect::<Vec<DBNotification>>()
.await?; .await?;
let mut redis = redis.connect().await?;
redis redis
.set_serialized_to_json( .set_serialized_to_json(
USER_NOTIFICATIONS_NAMESPACE, USER_NOTIFICATIONS_NAMESPACE,

View File

@@ -52,15 +52,20 @@ impl NotificationTemplate {
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>, exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
redis: &RedisPool, redis: &RedisPool,
) -> Result<Vec<NotificationTemplate>, DatabaseError> { ) -> Result<Vec<NotificationTemplate>, DatabaseError> {
{
let mut redis = redis.connect().await?; let mut redis = redis.connect().await?;
let maybe_cached_templates = redis let maybe_cached_templates = redis
.get_deserialized_from_json(TEMPLATES_NAMESPACE, channel.as_str()) .get_deserialized_from_json(
TEMPLATES_NAMESPACE,
channel.as_str(),
)
.await?; .await?;
if let Some(cached) = maybe_cached_templates { if let Some(cached) = maybe_cached_templates {
return Ok(cached); return Ok(cached);
} }
}
let results = sqlx::query_as!( let results = sqlx::query_as!(
NotificationTemplateQueryResult, NotificationTemplateQueryResult,
@@ -74,6 +79,8 @@ impl NotificationTemplate {
let templates = results.into_iter().map(Into::into).collect(); let templates = results.into_iter().map(Into::into).collect();
let mut redis = redis.connect().await?;
redis redis
.set_serialized_to_json( .set_serialized_to_json(
TEMPLATES_NAMESPACE, TEMPLATES_NAMESPACE,

View File

@@ -38,6 +38,7 @@ impl NotificationTypeItem {
) -> Result<Vec<NotificationTypeItem>, DatabaseError> ) -> Result<Vec<NotificationTypeItem>, DatabaseError>
where where
E: sqlx::Executor<'a, Database = sqlx::Postgres>, E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
{ {
let mut redis = redis.connect().await?; let mut redis = redis.connect().await?;
@@ -48,6 +49,7 @@ impl NotificationTypeItem {
if let Some(types) = cached_types { if let Some(types) = cached_types {
return Ok(types); return Ok(types);
} }
}
let results = sqlx::query_as!( let results = sqlx::query_as!(
NotificationTypeQueryResult, NotificationTypeQueryResult,
@@ -58,6 +60,8 @@ impl NotificationTypeItem {
let types = results.into_iter().map(Into::into).collect(); let types = results.into_iter().map(Into::into).collect();
let mut redis = redis.connect().await?;
redis redis
.set_serialized_to_json( .set_serialized_to_json(
NOTIFICATION_TYPES_NAMESPACE, NOTIFICATION_TYPES_NAMESPACE,

View File

@@ -155,6 +155,7 @@ impl DBPersonalAccessToken {
) -> Result<Vec<DBPatId>, DatabaseError> ) -> Result<Vec<DBPatId>, DatabaseError>
where where
E: sqlx::Executor<'a, Database = sqlx::Postgres>, E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
{ {
let mut redis = redis.connect().await?; let mut redis = redis.connect().await?;
@@ -168,6 +169,7 @@ impl DBPersonalAccessToken {
if let Some(res) = res { if let Some(res) = res {
return Ok(res.into_iter().map(DBPatId).collect()); return Ok(res.into_iter().map(DBPatId).collect());
} }
}
let db_pats: Vec<DBPatId> = sqlx::query!( let db_pats: Vec<DBPatId> = sqlx::query!(
" "
@@ -183,6 +185,8 @@ impl DBPersonalAccessToken {
.try_collect::<Vec<DBPatId>>() .try_collect::<Vec<DBPatId>>()
.await?; .await?;
let mut redis = redis.connect().await?;
redis redis
.set( .set(
PATS_USERS_NAMESPACE, PATS_USERS_NAMESPACE,

View File

@@ -149,6 +149,7 @@ impl QueryProductWithPrices {
) -> Result<Vec<Self>, DatabaseError> ) -> Result<Vec<Self>, DatabaseError>
where where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy, E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
{
{ {
let mut redis = redis.connect().await?; let mut redis = redis.connect().await?;
@@ -159,6 +160,7 @@ impl QueryProductWithPrices {
if let Some(res) = res { if let Some(res) = res {
return Ok(res); return Ok(res);
} }
}
let all_products = product_item::DBProduct::get_all(exec).await?; let all_products = product_item::DBProduct::get_all(exec).await?;
let prices = let prices =
@@ -191,6 +193,8 @@ impl QueryProductWithPrices {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let mut redis = redis.connect().await?;
redis redis
.set_serialized_to_json(PRODUCTS_NAMESPACE, "all", &products, None) .set_serialized_to_json(PRODUCTS_NAMESPACE, "all", &products, None)
.await?; .await?;

View File

@@ -893,6 +893,7 @@ impl DBProject {
Option<DBProjectId>, Option<DBProjectId>,
)>; )>;
{
let mut redis = redis.connect().await?; let mut redis = redis.connect().await?;
let dependencies = redis let dependencies = redis
@@ -904,6 +905,7 @@ impl DBProject {
if let Some(dependencies) = dependencies { if let Some(dependencies) = dependencies {
return Ok(dependencies); return Ok(dependencies);
} }
}
let dependencies: Dependencies = sqlx::query!( let dependencies: Dependencies = sqlx::query!(
" "
@@ -930,6 +932,8 @@ impl DBProject {
.try_collect::<Dependencies>() .try_collect::<Dependencies>()
.await?; .await?;
let mut redis = redis.connect().await?;
redis redis
.set_serialized_to_json( .set_serialized_to_json(
PROJECTS_DEPENDENCIES_NAMESPACE, PROJECTS_DEPENDENCIES_NAMESPACE,

View File

@@ -208,6 +208,7 @@ impl DBSession {
) -> Result<Vec<DBSessionId>, DatabaseError> ) -> Result<Vec<DBSessionId>, DatabaseError>
where where
E: sqlx::Executor<'a, Database = sqlx::Postgres>, E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
{ {
let mut redis = redis.connect().await?; let mut redis = redis.connect().await?;
@@ -221,6 +222,7 @@ impl DBSession {
if let Some(res) = res { if let Some(res) = res {
return Ok(res.into_iter().map(DBSessionId).collect()); return Ok(res.into_iter().map(DBSessionId).collect());
} }
}
use futures::TryStreamExt; use futures::TryStreamExt;
let db_sessions: Vec<DBSessionId> = sqlx::query!( let db_sessions: Vec<DBSessionId> = sqlx::query!(
@@ -237,6 +239,8 @@ impl DBSession {
.try_collect::<Vec<DBSessionId>>() .try_collect::<Vec<DBSessionId>>()
.await?; .await?;
let mut redis = redis.connect().await?;
redis redis
.set_serialized_to_json( .set_serialized_to_json(
SESSIONS_USERS_NAMESPACE, SESSIONS_USERS_NAMESPACE,

View File

@@ -299,6 +299,7 @@ impl DBUser {
{ {
use futures::stream::TryStreamExt; use futures::stream::TryStreamExt;
{
let mut redis = redis.connect().await?; let mut redis = redis.connect().await?;
let cached_projects = redis let cached_projects = redis
@@ -311,6 +312,7 @@ impl DBUser {
if let Some(projects) = cached_projects { if let Some(projects) = cached_projects {
return Ok(projects); return Ok(projects);
} }
}
let db_projects = sqlx::query!( let db_projects = sqlx::query!(
" "
@@ -326,6 +328,8 @@ impl DBUser {
.try_collect::<Vec<DBProjectId>>() .try_collect::<Vec<DBProjectId>>()
.await?; .await?;
let mut redis = redis.connect().await?;
redis redis
.set_serialized_to_json( .set_serialized_to_json(
USERS_PROJECTS_NAMESPACE, USERS_PROJECTS_NAMESPACE,

View File

@@ -35,7 +35,21 @@ pub async fn connect_all() -> Result<(PgPool, ReadOnlyPgPool), sqlx::Error> {
info!("Initializing database connection"); info!("Initializing database connection");
let database_url = let database_url =
dotenvy::var("DATABASE_URL").expect("`DATABASE_URL` not in .env"); dotenvy::var("DATABASE_URL").expect("`DATABASE_URL` not in .env");
let acquire_timeout =
dotenvy::var("DATABASE_ACQUIRE_TIMEOUT_MS")
.ok()
.map_or_else(
|| Duration::from_millis(30000),
|x| {
Duration::from_millis(x.parse::<u64>().expect(
"DATABASE_ACQUIRE_TIMEOUT_MS must be a valid u64",
))
},
);
let pool = PgPoolOptions::new() let pool = PgPoolOptions::new()
.acquire_timeout(acquire_timeout)
.min_connections( .min_connections(
dotenvy::var("DATABASE_MIN_CONNECTIONS") dotenvy::var("DATABASE_MIN_CONNECTIONS")
.ok() .ok()
@@ -54,6 +68,7 @@ pub async fn connect_all() -> Result<(PgPool, ReadOnlyPgPool), sqlx::Error> {
if let Ok(url) = dotenvy::var("READONLY_DATABASE_URL") { if let Ok(url) = dotenvy::var("READONLY_DATABASE_URL") {
let ro_pool = PgPoolOptions::new() let ro_pool = PgPoolOptions::new()
.acquire_timeout(acquire_timeout)
.min_connections( .min_connections(
dotenvy::var("READONLY_DATABASE_MIN_CONNECTIONS") dotenvy::var("READONLY_DATABASE_MIN_CONNECTIONS")
.ok() .ok()

View File

@@ -16,6 +16,7 @@ use std::fmt::{Debug, Display};
use std::future::Future; use std::future::Future;
use std::hash::Hash; use std::hash::Hash;
use std::time::Duration; use std::time::Duration;
use std::time::Instant;
const DEFAULT_EXPIRY: i64 = 60 * 60 * 12; // 12 hours const DEFAULT_EXPIRY: i64 = 60 * 60 * 12; // 12 hours
const ACTUAL_EXPIRY: i64 = 60 * 30; // 30 minutes const ACTUAL_EXPIRY: i64 = 60 * 30; // 30 minutes
@@ -37,6 +38,18 @@ impl RedisPool {
// testing pool uses a hashmap to mimic redis behaviour for very small data sizes (ie: tests) // testing pool uses a hashmap to mimic redis behaviour for very small data sizes (ie: tests)
// PANICS: production pool will panic if redis url is not set // PANICS: production pool will panic if redis url is not set
pub fn new(meta_namespace: Option<String>) -> Self { pub fn new(meta_namespace: Option<String>) -> Self {
let wait_timeout =
dotenvy::var("REDIS_WAIT_TIMEOUT_MS").ok().map_or_else(
|| Duration::from_millis(15000),
|x| {
Duration::from_millis(
x.parse::<u64>().expect(
"REDIS_WAIT_TIMEOUT_MS must be a valid u64",
),
)
},
);
let url = dotenvy::var("REDIS_URL").expect("Redis URL not set"); let url = dotenvy::var("REDIS_URL").expect("Redis URL not set");
let pool = Config::from_url(url.clone()) let pool = Config::from_url(url.clone())
.builder() .builder()
@@ -47,15 +60,30 @@ impl RedisPool {
.and_then(|x| x.parse().ok()) .and_then(|x| x.parse().ok())
.unwrap_or(10000), .unwrap_or(10000),
) )
.wait_timeout(Some(wait_timeout))
.runtime(Runtime::Tokio1) .runtime(Runtime::Tokio1)
.build() .build()
.expect("Redis connection failed"); .expect("Redis connection failed");
RedisPool { let pool = RedisPool {
url, url,
pool, pool,
meta_namespace: meta_namespace.unwrap_or("".to_string()), meta_namespace: meta_namespace.unwrap_or("".to_string()),
};
let interval = Duration::from_secs(30);
let max_age = Duration::from_secs(5 * 60); // 5 minutes
let pool_ref = pool.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(interval).await;
pool_ref
.pool
.retain(|_, metrics| metrics.last_used() < max_age);
} }
});
pool
} }
pub async fn register_and_set_metrics( pub async fn register_and_set_metrics(
@@ -483,12 +511,15 @@ impl RedisPool {
if !subscribe_ids.is_empty() { if !subscribe_ids.is_empty() {
fetch_tasks.push(Either::Right(async { fetch_tasks.push(Either::Right(async {
let mut interval = let mut wait_time_ms = 50;
tokio::time::interval(Duration::from_millis(100));
let start = Utc::now(); let start = Utc::now();
let mut redis_budget = Duration::ZERO;
loop { loop {
let results = { let results = {
let acquire_start = Instant::now();
let mut connection = self.pool.get().await?; let mut connection = self.pool.get().await?;
redis_budget += acquire_start.elapsed();
cmd("MGET") cmd("MGET")
.arg( .arg(
subscribe_ids subscribe_ids
@@ -507,15 +538,31 @@ impl RedisPool {
.await? .await?
}; };
if results.into_iter().all(|x| x.is_none()) { let exist_count =
results.into_iter().filter(|x| x.is_some()).count();
// None of the locks exist anymore, we can continue
if exist_count == 0 {
break; break;
} }
if (Utc::now() - start) > chrono::Duration::seconds(5) { let spinning = Utc::now() - start;
return Err(DatabaseError::CacheTimeout); if spinning > chrono::Duration::seconds(5) {
return Err(DatabaseError::CacheTimeout {
locks_released: subscribe_ids.len() - exist_count,
locks_waiting: subscribe_ids.len(),
time_spent_pool_wait_ms: redis_budget.as_millis()
as u64,
time_spent_total_ms: spinning
.num_milliseconds()
.max(0)
as u64,
});
} }
interval.tick().await; tokio::time::sleep(Duration::from_millis(wait_time_ms))
.await;
wait_time_ms *= 2; // 50, 100, 200, 400, 800, 1600, 3200
} }
let (return_values, _) = let (return_values, _) =