You've already forked AstralRinth
forked from didirus/AstralRinth
Increase index swap timeout, better index swap task o11y (#5171)
This commit is contained in:
committed by
GitHub
parent
7e1400d111
commit
ca1d66d070
@@ -44,7 +44,7 @@ const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120);
|
|||||||
pub async fn remove_documents(
|
pub async fn remove_documents(
|
||||||
ids: &[crate::models::ids::VersionId],
|
ids: &[crate::models::ids::VersionId],
|
||||||
config: &SearchConfig,
|
config: &SearchConfig,
|
||||||
) -> Result<(), meilisearch_sdk::errors::Error> {
|
) -> Result<(), IndexingError> {
|
||||||
let mut indexes = get_indexes_for_indexing(config, false).await?;
|
let mut indexes = get_indexes_for_indexing(config, false).await?;
|
||||||
let indexes_next = get_indexes_for_indexing(config, true).await?;
|
let indexes_next = get_indexes_for_indexing(config, true).await?;
|
||||||
|
|
||||||
@@ -167,11 +167,19 @@ pub async fn swap_index(
|
|||||||
|
|
||||||
client
|
client
|
||||||
.with_all_clients("swap_indexes", |client| async move {
|
.with_all_clients("swap_indexes", |client| async move {
|
||||||
client
|
let task = client
|
||||||
.swap_indexes([swap_indices_ref])
|
.swap_indexes([swap_indices_ref])
|
||||||
.await?
|
|
||||||
.wait_for_completion(client, None, Some(TIMEOUT))
|
|
||||||
.await
|
.await
|
||||||
|
.map_err(IndexingError::Indexing)?;
|
||||||
|
|
||||||
|
monitor_task(
|
||||||
|
client,
|
||||||
|
task,
|
||||||
|
Duration::from_secs(60 * 10), // 10 minutes
|
||||||
|
Some(Duration::from_secs(1)),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
@@ -182,7 +190,7 @@ pub async fn swap_index(
|
|||||||
pub async fn get_indexes_for_indexing(
|
pub async fn get_indexes_for_indexing(
|
||||||
config: &SearchConfig,
|
config: &SearchConfig,
|
||||||
next: bool, // Get the 'next' one
|
next: bool, // Get the 'next' one
|
||||||
) -> Result<Vec<Vec<Index>>, meilisearch_sdk::errors::Error> {
|
) -> Result<Vec<Vec<Index>>, IndexingError> {
|
||||||
let client = config.make_batch_client()?;
|
let client = config.make_batch_client()?;
|
||||||
let project_name = config.get_index_name("projects", next);
|
let project_name = config.get_index_name("projects", next);
|
||||||
let project_filtered_name =
|
let project_filtered_name =
|
||||||
@@ -343,6 +351,7 @@ async fn monitor_task(
|
|||||||
|
|
||||||
let id = task.get_task_uid();
|
let id = task.get_task_uid();
|
||||||
let mut interval = tokio::time::interval(Duration::from_secs(30));
|
let mut interval = tokio::time::interval(Duration::from_secs(30));
|
||||||
|
interval.reset();
|
||||||
|
|
||||||
let wait = task.wait_for_completion(client, poll, Some(timeout));
|
let wait = task.wait_for_completion(client, poll, Some(timeout));
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
use crate::models::error::ApiError;
|
|
||||||
use crate::models::projects::SearchRequest;
|
use crate::models::projects::SearchRequest;
|
||||||
|
use crate::{models::error::ApiError, search::indexing::IndexingError};
|
||||||
use actix_web::HttpResponse;
|
use actix_web::HttpResponse;
|
||||||
use actix_web::http::StatusCode;
|
use actix_web::http::StatusCode;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
@@ -87,10 +87,10 @@ impl BatchClient {
|
|||||||
&'a self,
|
&'a self,
|
||||||
task_name: &str,
|
task_name: &str,
|
||||||
generator: G,
|
generator: G,
|
||||||
) -> Result<Vec<T>, meilisearch_sdk::errors::Error>
|
) -> Result<Vec<T>, IndexingError>
|
||||||
where
|
where
|
||||||
G: Fn(&'a Client) -> Fut,
|
G: Fn(&'a Client) -> Fut,
|
||||||
Fut: Future<Output = Result<T, meilisearch_sdk::errors::Error>> + 'a,
|
Fut: Future<Output = Result<T, IndexingError>> + 'a,
|
||||||
{
|
{
|
||||||
let mut tasks = FuturesOrdered::new();
|
let mut tasks = FuturesOrdered::new();
|
||||||
for (idx, client) in self.clients.iter().enumerate() {
|
for (idx, client) in self.clients.iter().enumerate() {
|
||||||
|
|||||||
Reference in New Issue
Block a user