Fix cache stampede issues + generalize cache (#884)

* caching changes

* fix cache stampede issues

* Use pub/sub for better DB fetches

* remove pubsub

* remove debugs

* Fix caches not working

* fix search indexing removal
This commit is contained in:
Geometrically
2024-03-26 21:15:50 -07:00
committed by GitHub
parent decfcb6c27
commit a0aa350a08
48 changed files with 1540 additions and 1655 deletions

View File

@@ -4,6 +4,8 @@ use crate::database::models::DatabaseError;
use crate::database::redis::RedisPool;
use crate::models::collections::CollectionStatus;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
const COLLECTIONS_NAMESPACE: &str = "collections";
@@ -155,93 +157,55 @@ impl Collection {
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
use futures::TryStreamExt;
let val = redis
.get_cached_keys(
COLLECTIONS_NAMESPACE,
&collection_ids.iter().map(|x| x.0).collect::<Vec<_>>(),
|collection_ids| async move {
let collections = sqlx::query!(
"
SELECT c.id id, c.name name, c.description description,
c.icon_url icon_url, c.color color, c.created created, c.user_id user_id,
c.updated updated, c.status status,
ARRAY_AGG(DISTINCT cm.mod_id) filter (where cm.mod_id is not null) mods
FROM collections c
LEFT JOIN collections_mods cm ON cm.collection_id = c.id
WHERE c.id = ANY($1)
GROUP BY c.id;
",
&collection_ids,
)
.fetch(exec)
.try_fold(DashMap::new(), |acc, m| {
let collection = Collection {
id: CollectionId(m.id),
user_id: UserId(m.user_id),
name: m.name.clone(),
description: m.description.clone(),
icon_url: m.icon_url.clone(),
color: m.color.map(|x| x as u32),
created: m.created,
updated: m.updated,
status: CollectionStatus::from_string(&m.status),
projects: m
.mods
.unwrap_or_default()
.into_iter()
.map(ProjectId)
.collect(),
};
let mut redis = redis.connect().await?;
acc.insert(m.id, collection);
async move { Ok(acc) }
})
.await?;
if collection_ids.is_empty() {
return Ok(Vec::new());
}
let mut found_collections = Vec::new();
let mut remaining_collections: Vec<CollectionId> = collection_ids.to_vec();
if !collection_ids.is_empty() {
let collections = redis
.multi_get::<String>(
COLLECTIONS_NAMESPACE,
collection_ids.iter().map(|x| x.0.to_string()),
)
.await?;
for collection in collections {
if let Some(collection) =
collection.and_then(|x| serde_json::from_str::<Collection>(&x).ok())
{
remaining_collections.retain(|x| collection.id.0 != x.0);
found_collections.push(collection);
continue;
}
}
}
if !remaining_collections.is_empty() {
let collection_ids_parsed: Vec<i64> =
remaining_collections.iter().map(|x| x.0).collect();
let db_collections: Vec<Collection> = sqlx::query!(
"
SELECT c.id id, c.name name, c.description description,
c.icon_url icon_url, c.color color, c.created created, c.user_id user_id,
c.updated updated, c.status status,
ARRAY_AGG(DISTINCT cm.mod_id) filter (where cm.mod_id is not null) mods
FROM collections c
LEFT JOIN collections_mods cm ON cm.collection_id = c.id
WHERE c.id = ANY($1)
GROUP BY c.id;
",
&collection_ids_parsed,
Ok(collections)
},
)
.fetch_many(exec)
.try_filter_map(|e| async {
Ok(e.right().map(|m| {
let id = m.id;
Collection {
id: CollectionId(id),
user_id: UserId(m.user_id),
name: m.name.clone(),
description: m.description.clone(),
icon_url: m.icon_url.clone(),
color: m.color.map(|x| x as u32),
created: m.created,
updated: m.updated,
status: CollectionStatus::from_string(&m.status),
projects: m
.mods
.unwrap_or_default()
.into_iter()
.map(ProjectId)
.collect(),
}
}))
})
.try_collect::<Vec<Collection>>()
.await?;
for collection in db_collections {
redis
.set_serialized_to_json(
COLLECTIONS_NAMESPACE,
collection.id.0,
&collection,
None,
)
.await?;
found_collections.push(collection);
}
}
Ok(found_collections)
Ok(val)
}
pub async fn clear_cache(id: CollectionId, redis: &RedisPool) -> Result<(), DatabaseError> {