Better observability for search indexing, fix remove_documents (#5143)

* Better observability for search timeout, fix remove_documents

* Log client idx
This commit is contained in:
François-Xavier Talbot
2026-01-16 16:51:51 -05:00
committed by GitHub
parent 240e5455cc
commit 2c096a85d6

View File

@@ -14,7 +14,7 @@ use meilisearch_sdk::indexes::Index;
use meilisearch_sdk::settings::{PaginationSetting, Settings};
use sqlx::postgres::PgPool;
use thiserror::Error;
use tracing::{info, trace};
use tracing::{Instrument, error, info, info_span, instrument, trace};
#[derive(Error, Debug)]
pub enum IndexingError {
@@ -36,7 +36,7 @@ pub enum IndexingError {
// is too large (>10MiB) then the request fails with an error. This chunk size
// assumes a max average size of 4KiB per project to avoid this cap.
const MEILISEARCH_CHUNK_SIZE: usize = 10000000;
const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120);
pub async fn remove_documents(
ids: &[crate::models::ids::VersionId],
@@ -167,6 +167,7 @@ pub async fn swap_index(
Ok(())
}
#[instrument(skip(config))]
pub async fn get_indexes_for_indexing(
config: &SearchConfig,
next: bool, // Get the 'next' one
@@ -215,13 +216,13 @@ pub async fn get_indexes_for_indexing(
Ok(results)
}
#[tracing::instrument(skip_all, fields(%name))]
#[instrument(skip_all, fields(name))]
async fn create_or_update_index(
client: &Client,
name: &str,
custom_rules: Option<&'static [&'static str]>,
) -> Result<Index, meilisearch_sdk::errors::Error> {
info!("Updating/creating index {}", name);
info!("Updating/creating index");
match client.get_index(name).await {
Ok(index) => {
@@ -236,9 +237,13 @@ async fn create_or_update_index(
info!("Performing index settings set.");
index
.set_settings(&settings)
.await?
.await
.inspect_err(|e| error!("Error setting index settings: {e:?}"))?
.wait_for_completion(client, None, Some(TIMEOUT))
.await?;
.await
.inspect_err(|e| {
error!("Error setting index settings while waiting: {e:?}")
})?;
info!("Done performing index settings set.");
Ok(index)
@@ -250,7 +255,10 @@ async fn create_or_update_index(
let task = client.create_index(name, Some("version_id")).await?;
let task = task
.wait_for_completion(client, None, Some(TIMEOUT))
.await?;
.await
.inspect_err(|e| {
error!("Error creating index while waiting: {e:?}")
})?;
let index = task
.try_make_index(client)
.map_err(|x| x.unwrap_failure())?;
@@ -263,15 +271,20 @@ async fn create_or_update_index(
index
.set_settings(&settings)
.await?
.await
.inspect_err(|e| error!("Error setting index settings: {e:?}"))?
.wait_for_completion(client, None, Some(TIMEOUT))
.await?;
.await
.inspect_err(|e| {
error!("Error setting index settings while waiting: {e:?}")
})?;
Ok(index)
}
}
}
#[instrument(skip_all, fields(index.name, mods.len = mods.len()))]
async fn add_to_index(
client: &Client,
index: &Index,
@@ -282,21 +295,31 @@ async fn add_to_index(
"Adding chunk starting with version id {}",
chunk[0].version_id
);
let now = std::time::Instant::now();
index
.add_or_replace(chunk, Some("version_id"))
.await?
.await
.inspect_err(|e| error!("Error adding chunk to index: {e:?}"))?
.wait_for_completion(
client,
None,
Some(std::time::Duration::from_secs(3600)),
Some(std::time::Duration::from_secs(7200)), // 2 hours
)
.await?;
info!("Added chunk of {} projects to index", chunk.len());
.await
.inspect_err(|e| error!("Error adding chunk to index: {e:?}"))?;
info!(
"Added chunk of {} projects to index in {:.2} seconds",
chunk.len(),
now.elapsed().as_secs_f64()
);
}
Ok(())
}
#[instrument(skip_all, fields(index.name))]
async fn update_and_add_to_index(
client: &Client,
index: &Index,
@@ -357,20 +380,28 @@ pub async fn add_projects_batch_client(
let mut tasks = FuturesOrdered::new();
let mut id = 0;
client.across_all(index_references, |index_list, client| {
let span = info_span!("add_projects_batch", client.idx = id);
id += 1;
for index in index_list {
let owned_client = client.clone();
let projects_ref = &projects;
let additional_fields_ref = &additional_fields;
tasks.push_back(async move {
update_and_add_to_index(
&owned_client,
index,
projects_ref,
additional_fields_ref,
)
.await
});
tasks.push_back(
async move {
update_and_add_to_index(
&owned_client,
index,
projects_ref,
additional_fields_ref,
)
.await
}
.instrument(span.clone()),
);
}
});