From 5c29a8c7ddf667b111c5ab05062928f24f2a2ebe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois-Xavier=20Talbot?= <108630700+fetchfern@users.noreply.github.com> Date: Fri, 23 Jan 2026 07:32:02 -0500 Subject: [PATCH] Batched search indexing (#5191) * Use RO pool for search indexing * Batched search indexing that actually works * Query cache --- ...f4eeff66ab4165a9f4980032e114db4dc1286.json | 26 ------------ ...3ae8987441bb5e89c9ea62d347e47899e3c2.json} | 8 ++-- ...d2402f52fea71e27b08e7926fcc2a9e62c0f3.json | 20 --------- ...afedb074492b4ec7f2457c14113f5fd13aa02.json | 18 -------- ...e5c93783c7641b019fdb698a1ec0be1393606.json | 17 -------- apps/labrinth/src/background_task.rs | 11 +++-- .../src/database/postgres_database.rs | 6 +++ apps/labrinth/src/main.rs | 1 + .../src/search/indexing/local_import.rs | 18 ++++++-- apps/labrinth/src/search/indexing/mod.rs | 42 +++++++++++++------ 10 files changed, 63 insertions(+), 104 deletions(-) delete mode 100644 apps/labrinth/.sqlx/query-1adbd24d815107e13bc1440c7a8f4eeff66ab4165a9f4980032e114db4dc1286.json rename apps/labrinth/.sqlx/{query-b30d0365bd116fceee5de03fb9e3087a587633783894a5041889b856d47a4ed5.json => query-702a2826d5857dc51b1a7a79c9043ae8987441bb5e89c9ea62d347e47899e3c2.json} (86%) delete mode 100644 apps/labrinth/.sqlx/query-b92b5bb7d179c4fcdbc45600ccfd2402f52fea71e27b08e7926fcc2a9e62c0f3.json delete mode 100644 apps/labrinth/.sqlx/query-cd5ccd618fb3cc41646a6de86f9afedb074492b4ec7f2457c14113f5fd13aa02.json delete mode 100644 apps/labrinth/.sqlx/query-cec4240c7c848988b3dfd13e3f8e5c93783c7641b019fdb698a1ec0be1393606.json diff --git a/apps/labrinth/.sqlx/query-1adbd24d815107e13bc1440c7a8f4eeff66ab4165a9f4980032e114db4dc1286.json b/apps/labrinth/.sqlx/query-1adbd24d815107e13bc1440c7a8f4eeff66ab4165a9f4980032e114db4dc1286.json deleted file mode 100644 index 921f7f92..00000000 --- a/apps/labrinth/.sqlx/query-1adbd24d815107e13bc1440c7a8f4eeff66ab4165a9f4980032e114db4dc1286.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n id,\n status AS \"status: PayoutStatus\"\n FROM payouts\n ORDER BY id\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id", - "type_info": "Int8" - }, - { - "ordinal": 1, - "name": "status: PayoutStatus", - "type_info": "Varchar" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false, - false - ] - }, - "hash": "1adbd24d815107e13bc1440c7a8f4eeff66ab4165a9f4980032e114db4dc1286" -} diff --git a/apps/labrinth/.sqlx/query-b30d0365bd116fceee5de03fb9e3087a587633783894a5041889b856d47a4ed5.json b/apps/labrinth/.sqlx/query-702a2826d5857dc51b1a7a79c9043ae8987441bb5e89c9ea62d347e47899e3c2.json similarity index 86% rename from apps/labrinth/.sqlx/query-b30d0365bd116fceee5de03fb9e3087a587633783894a5041889b856d47a4ed5.json rename to apps/labrinth/.sqlx/query-702a2826d5857dc51b1a7a79c9043ae8987441bb5e89c9ea62d347e47899e3c2.json index 6142e7dc..56f47317 100644 --- a/apps/labrinth/.sqlx/query-b30d0365bd116fceee5de03fb9e3087a587633783894a5041889b856d47a4ed5.json +++ b/apps/labrinth/.sqlx/query-702a2826d5857dc51b1a7a79c9043ae8987441bb5e89c9ea62d347e47899e3c2.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT m.id id, m.name name, m.summary summary, m.downloads downloads, m.follows follows,\n m.icon_url icon_url, m.updated updated, m.approved approved, m.published, m.license license, m.slug slug, m.color\n FROM mods m\n WHERE m.status = ANY($1)\n GROUP BY m.id;\n ", + "query": "\n SELECT m.id id, m.name name, m.summary summary, m.downloads downloads, m.follows follows,\n m.icon_url icon_url, m.updated updated, m.approved approved, m.published, m.license license, m.slug slug, m.color\n FROM mods m\n WHERE m.status = ANY($1) AND m.id > $3\n GROUP BY m.id\n ORDER BY m.id ASC\n LIMIT $2;\n ", "describe": { "columns": [ { @@ -66,7 +66,9 @@ ], "parameters": { "Left": [ - "TextArray" + "TextArray", + "Int8", + "Int8" ] }, "nullable": [ @@ -84,5 +86,5 @@ true ] }, - "hash": "b30d0365bd116fceee5de03fb9e3087a587633783894a5041889b856d47a4ed5" + "hash": "702a2826d5857dc51b1a7a79c9043ae8987441bb5e89c9ea62d347e47899e3c2" } diff --git a/apps/labrinth/.sqlx/query-b92b5bb7d179c4fcdbc45600ccfd2402f52fea71e27b08e7926fcc2a9e62c0f3.json b/apps/labrinth/.sqlx/query-b92b5bb7d179c4fcdbc45600ccfd2402f52fea71e27b08e7926fcc2a9e62c0f3.json deleted file mode 100644 index 89bd8147..00000000 --- a/apps/labrinth/.sqlx/query-b92b5bb7d179c4fcdbc45600ccfd2402f52fea71e27b08e7926fcc2a9e62c0f3.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT status AS \"status: PayoutStatus\" FROM payouts WHERE id = 1", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "status: PayoutStatus", - "type_info": "Varchar" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false - ] - }, - "hash": "b92b5bb7d179c4fcdbc45600ccfd2402f52fea71e27b08e7926fcc2a9e62c0f3" -} diff --git a/apps/labrinth/.sqlx/query-cd5ccd618fb3cc41646a6de86f9afedb074492b4ec7f2457c14113f5fd13aa02.json b/apps/labrinth/.sqlx/query-cd5ccd618fb3cc41646a6de86f9afedb074492b4ec7f2457c14113f5fd13aa02.json deleted file mode 100644 index 469c3016..00000000 --- a/apps/labrinth/.sqlx/query-cd5ccd618fb3cc41646a6de86f9afedb074492b4ec7f2457c14113f5fd13aa02.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO payouts (id, method, platform_id, status, user_id, amount, created)\n VALUES ($1, $2, $3, $4, $5, 10.0, NOW())\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Text", - "Text", - "Varchar", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "cd5ccd618fb3cc41646a6de86f9afedb074492b4ec7f2457c14113f5fd13aa02" -} diff --git a/apps/labrinth/.sqlx/query-cec4240c7c848988b3dfd13e3f8e5c93783c7641b019fdb698a1ec0be1393606.json b/apps/labrinth/.sqlx/query-cec4240c7c848988b3dfd13e3f8e5c93783c7641b019fdb698a1ec0be1393606.json deleted file mode 100644 index 52e020eb..00000000 --- a/apps/labrinth/.sqlx/query-cec4240c7c848988b3dfd13e3f8e5c93783c7641b019fdb698a1ec0be1393606.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO payouts (id, method, platform_id, status, user_id, amount, created)\n VALUES ($1, $2, NULL, $3, $4, 10.00, NOW())\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Text", - "Varchar", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "cec4240c7c848988b3dfd13e3f8e5c93783c7641b019fdb698a1ec0be1393606" -} diff --git a/apps/labrinth/src/background_task.rs b/apps/labrinth/src/background_task.rs index 945414db..4f4bb346 100644 --- a/apps/labrinth/src/background_task.rs +++ b/apps/labrinth/src/background_task.rs @@ -31,7 +31,8 @@ impl BackgroundTask { #[allow(clippy::too_many_arguments)] pub async fn run( self, - pool: sqlx::Pool, + pool: sqlx::PgPool, + ro_pool: sqlx::PgPool, redis_pool: RedisPool, search_config: search::SearchConfig, clickhouse: clickhouse::Client, @@ -43,7 +44,9 @@ impl BackgroundTask { use BackgroundTask::*; match self { Migrations => run_migrations().await, - IndexSearch => index_search(pool, redis_pool, search_config).await, + IndexSearch => { + index_search(ro_pool, redis_pool, search_config).await + } ReleaseScheduled => release_scheduled(pool).await, UpdateVersions => update_versions(pool, redis_pool).await, Payouts => payouts(pool, clickhouse, redis_pool).await, @@ -117,12 +120,12 @@ pub async fn run_migrations() { } pub async fn index_search( - pool: sqlx::Pool, + ro_pool: sqlx::PgPool, redis_pool: RedisPool, search_config: search::SearchConfig, ) { info!("Indexing local database"); - let result = index_projects(pool, redis_pool, &search_config).await; + let result = index_projects(ro_pool, redis_pool, &search_config).await; if let Err(e) = result { warn!("Local project indexing failed: {:?}", e); } diff --git a/apps/labrinth/src/database/postgres_database.rs b/apps/labrinth/src/database/postgres_database.rs index 0dd86d4c..7d6bed13 100644 --- a/apps/labrinth/src/database/postgres_database.rs +++ b/apps/labrinth/src/database/postgres_database.rs @@ -17,6 +17,12 @@ impl From for ReadOnlyPgPool { } } +impl ReadOnlyPgPool { + pub fn into_inner(self) -> PgPool { + self.0 + } +} + impl Deref for ReadOnlyPgPool { type Target = PgPool; diff --git a/apps/labrinth/src/main.rs b/apps/labrinth/src/main.rs index f785fc0a..2107c908 100644 --- a/apps/labrinth/src/main.rs +++ b/apps/labrinth/src/main.rs @@ -176,6 +176,7 @@ async fn app() -> std::io::Result<()> { info!("Running task {task:?} and exiting"); task.run( pool, + ro_pool.into_inner(), redis_pool, search_config, clickhouse, diff --git a/apps/labrinth/src/search/indexing/local_import.rs b/apps/labrinth/src/search/indexing/local_import.rs index d1b820f3..c19b1754 100644 --- a/apps/labrinth/src/search/indexing/local_import.rs +++ b/apps/labrinth/src/search/indexing/local_import.rs @@ -22,7 +22,9 @@ use sqlx::postgres::PgPool; pub async fn index_local( pool: &PgPool, -) -> Result, IndexingError> { + cursor: i64, + limit: i64, +) -> Result<(Vec, i64), IndexingError> { info!("Indexing local projects!"); // todo: loaders, project type, game versions @@ -45,13 +47,17 @@ pub async fn index_local( SELECT m.id id, m.name name, m.summary summary, m.downloads downloads, m.follows follows, m.icon_url icon_url, m.updated updated, m.approved approved, m.published, m.license license, m.slug slug, m.color FROM mods m - WHERE m.status = ANY($1) - GROUP BY m.id; + WHERE m.status = ANY($1) AND m.id > $3 + GROUP BY m.id + ORDER BY m.id ASC + LIMIT $2; ", &*crate::models::projects::ProjectStatus::iterator() .filter(|x| x.is_searchable()) .map(|x| x.to_string()) .collect::>(), + limit, + cursor, ) .fetch(pool) .map_ok(|m| { @@ -74,6 +80,10 @@ pub async fn index_local( let project_ids = db_projects.iter().map(|x| x.id.0).collect::>(); + let Some(largest) = project_ids.iter().max() else { + return Ok((vec![], i64::MAX)); + }; + struct PartialGallery { url: String, featured: bool, @@ -415,7 +425,7 @@ pub async fn index_local( } } - Ok(uploads) + Ok((uploads, *largest)) } struct PartialVersion { diff --git a/apps/labrinth/src/search/indexing/mod.rs b/apps/labrinth/src/search/indexing/mod.rs index 45def70d..488ee7e8 100644 --- a/apps/labrinth/src/search/indexing/mod.rs +++ b/apps/labrinth/src/search/indexing/mod.rs @@ -86,7 +86,7 @@ pub async fn remove_documents( } pub async fn index_projects( - pool: PgPool, + ro_pool: PgPool, redis: RedisPool, config: &SearchConfig, ) -> Result<(), IndexingError> { @@ -111,7 +111,7 @@ pub async fn index_projects( let all_loader_fields = crate::database::models::loader_fields::LoaderField::get_fields_all( - &pool, &redis, + &ro_pool, &redis, ) .await? .into_iter() @@ -120,17 +120,35 @@ pub async fn index_projects( info!("Gathering local projects"); - let uploads = index_local(&pool).await?; + let mut cursor = 0; + let mut idx = 0; + let mut total = 0; - info!("Adding projects to index"); + loop { + info!("Gathering index data chunk {idx}"); + idx += 1; - add_projects_batch_client( - &indices, - uploads, - all_loader_fields.clone(), - config, - ) - .await?; + let (uploads, next_cursor) = + index_local(&ro_pool, cursor, 10000).await?; + total += uploads.len(); + + if uploads.is_empty() { + info!( + "No more projects to index, indexed {total} projects after {idx} chunks" + ); + break; + } + + cursor = next_cursor; + + add_projects_batch_client( + &indices, + uploads, + all_loader_fields.clone(), + config, + ) + .await?; + } info!("Swapping indexes"); @@ -326,7 +344,7 @@ async fn add_to_index( monitor_task( client, task, - Duration::from_secs(60 * 10), // Timeout after 10 minutes + Duration::from_secs(60 * 5), // Timeout after 10 minutes Some(Duration::from_secs(1)), // Poll once every second ) .await?;