Testing search prod (#791)

* testing push

* lowers it

* removed unwrap

* reduced to 500

* Really took down time

* reorders chunking

* rebuild docker

* reverted most changes

* cargo fmt

* reduced meilisearch limit

* added logs, removed deletion of index

* one client creation

* changes

* reverted gallery cahnge

* testing re-splitting again

* Remove chunking + index deletion

* Bring back chunking

* Update chunk size

---------

Co-authored-by: Jai A <jaiagr+gpg@pm.me>
Co-authored-by: Geometrically <18202329+Geometrically@users.noreply.github.com>
This commit is contained in:
Wyatt Verchere
2023-12-11 20:01:15 -08:00
committed by GitHub
parent 6217523cc8
commit 90954dac49
15 changed files with 274 additions and 315 deletions

View File

@@ -1,11 +1,12 @@
/// This module is used for the indexing from any source.
pub mod local_import;
use itertools::Itertools;
use std::collections::HashMap;
use crate::database::redis::RedisPool;
use crate::models::ids::base62_impl::to_base62;
use crate::search::{SearchConfig, UploadSearchProject};
use itertools::Itertools;
use local_import::index_local;
use log::info;
use meilisearch_sdk::client::Client;
@@ -34,23 +35,41 @@ pub enum IndexingError {
// The chunk size for adding projects to the indexing database. If the request size
// is too large (>10MiB) then the request fails with an error. This chunk size
// assumes a max average size of 1KiB per project to avoid this cap.
const MEILISEARCH_CHUNK_SIZE: usize = 10000;
// assumes a max average size of 4KiB per project to avoid this cap.
const MEILISEARCH_CHUNK_SIZE: usize = 2500; // Should be less than FETCH_PROJECT_SIZE
const FETCH_PROJECT_SIZE: usize = 5000;
const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
pub async fn remove_documents(
ids: &[crate::models::ids::VersionId],
config: &SearchConfig,
) -> Result<(), meilisearch_sdk::errors::Error> {
let indexes = get_indexes(config).await?;
for index in indexes {
index
.delete_documents(&ids.iter().map(|x| to_base62(x.0)).collect::<Vec<_>>())
.await?;
}
Ok(())
}
pub async fn index_projects(
pool: PgPool,
redis: RedisPool,
config: &SearchConfig,
) -> Result<(), IndexingError> {
let mut docs_to_add: Vec<UploadSearchProject> = vec![];
let mut additional_fields: Vec<String> = vec![];
info!("Indexing projects.");
let indices = get_indexes(config).await?;
let all_ids = get_all_ids(pool.clone()).await?;
let all_ids_len = all_ids.len();
info!("Got all ids, indexing {} projects", all_ids_len);
let mut so_far = 0;
let mut so_far = 0;
let as_chunks: Vec<_> = all_ids
.into_iter()
.chunks(FETCH_PROJECT_SIZE)
@@ -74,126 +93,24 @@ pub async fn index_projects(
(version_id, (project_id, owner_username.to_lowercase()))
})
.collect::<HashMap<_, _>>();
let (mut uploads, mut loader_fields) = index_local(&pool, &redis, id_chunk).await?;
docs_to_add.append(&mut uploads);
additional_fields.append(&mut loader_fields);
let (uploads, loader_fields) = index_local(&pool, &redis, id_chunk).await?;
info!("Got chunk, adding to docs_to_add");
add_projects(&indices, uploads, loader_fields, config).await?;
}
info!("Got all ids, indexing...");
// Write Indices
add_projects(docs_to_add, additional_fields, config).await?;
info!("Done adding projects.");
Ok(())
}
async fn create_index(
client: &Client,
name: &'static str,
custom_rules: Option<&'static [&'static str]>,
) -> Result<Index, IndexingError> {
client
.delete_index(name)
.await?
.wait_for_completion(client, None, None)
.await?;
match client.get_index(name).await {
Ok(index) => {
index
.set_settings(&default_settings())
.await?
.wait_for_completion(client, None, None)
.await?;
Ok(index)
}
Err(meilisearch_sdk::errors::Error::Meilisearch(
meilisearch_sdk::errors::MeilisearchError {
error_code: meilisearch_sdk::errors::ErrorCode::IndexNotFound,
..
},
)) => {
// Only create index and set settings if the index doesn't already exist
let task = client.create_index(name, Some("version_id")).await?;
let task = task.wait_for_completion(client, None, None).await?;
let index = task
.try_make_index(client)
.map_err(|_| IndexingError::Task)?;
let mut settings = default_settings();
if let Some(custom_rules) = custom_rules {
settings = settings.with_ranking_rules(custom_rules);
}
index
.set_settings(&settings)
.await?
.wait_for_completion(client, None, None)
.await?;
Ok(index)
}
Err(e) => {
log::warn!("Unhandled error while creating index: {}", e);
Err(IndexingError::Indexing(e))
}
}
}
async fn add_to_index(
client: &Client,
index: Index,
mods: &[UploadSearchProject],
) -> Result<(), IndexingError> {
for chunk in mods.chunks(MEILISEARCH_CHUNK_SIZE) {
index
.add_documents(chunk, Some("version_id"))
.await?
.wait_for_completion(client, None, None)
.await?;
}
Ok(())
}
async fn create_and_add_to_index(
client: &Client,
projects: &[UploadSearchProject],
additional_fields: &[String],
name: &'static str,
custom_rules: Option<&'static [&'static str]>,
) -> Result<(), IndexingError> {
let index = create_index(client, name, custom_rules).await?;
let mut new_filterable_attributes = index.get_filterable_attributes().await?;
let mut new_displayed_attributes = index.get_displayed_attributes().await?;
new_filterable_attributes.extend(additional_fields.iter().map(|s| s.to_string()));
new_displayed_attributes.extend(additional_fields.iter().map(|s| s.to_string()));
index
.set_filterable_attributes(new_filterable_attributes)
.await?;
index
.set_displayed_attributes(new_displayed_attributes)
.await?;
add_to_index(client, index, projects).await?;
Ok(())
}
pub async fn add_projects(
projects: Vec<UploadSearchProject>,
additional_fields: Vec<String>,
pub async fn get_indexes(
config: &SearchConfig,
) -> Result<(), IndexingError> {
) -> Result<Vec<Index>, meilisearch_sdk::errors::Error> {
let client = config.make_client();
create_and_add_to_index(&client, &projects, &additional_fields, "projects", None).await?;
create_and_add_to_index(
let projects_index = create_or_update_index(&client, "projects", None).await?;
let projects_filtered_index = create_or_update_index(
&client,
&projects,
&additional_fields,
"projects_filtered",
Some(&[
"sort",
@@ -206,6 +123,120 @@ pub async fn add_projects(
)
.await?;
Ok(vec![projects_index, projects_filtered_index])
}
async fn create_or_update_index(
client: &Client,
name: &'static str,
custom_rules: Option<&'static [&'static str]>,
) -> Result<Index, meilisearch_sdk::errors::Error> {
info!("Updating/creating index.");
match client.get_index(name).await {
Ok(index) => {
info!("Updating index settings.");
let mut settings = default_settings();
if let Some(custom_rules) = custom_rules {
settings = settings.with_ranking_rules(custom_rules);
}
index
.set_settings(&settings)
.await?
.wait_for_completion(client, None, Some(TIMEOUT))
.await?;
Ok(index)
}
_ => {
info!("Creating index.");
// Only create index and set settings if the index doesn't already exist
let task = client.create_index(name, Some("version_id")).await?;
let task = task
.wait_for_completion(client, None, Some(TIMEOUT))
.await?;
let index = task
.try_make_index(client)
.map_err(|x| x.unwrap_failure())?;
let mut settings = default_settings();
if let Some(custom_rules) = custom_rules {
settings = settings.with_ranking_rules(custom_rules);
}
index
.set_settings(&settings)
.await?
.wait_for_completion(client, None, Some(TIMEOUT))
.await?;
Ok(index)
}
}
}
async fn add_to_index(
client: &Client,
index: &Index,
mods: &[UploadSearchProject],
) -> Result<(), IndexingError> {
for chunk in mods.chunks(MEILISEARCH_CHUNK_SIZE) {
info!(
"Adding chunk starting with version id {}",
chunk[0].version_id
);
index
.add_or_replace(chunk, Some("version_id"))
.await?
.wait_for_completion(client, None, Some(std::time::Duration::from_secs(3600)))
.await?;
info!("Added chunk of {} projects to index", chunk.len());
}
Ok(())
}
async fn update_and_add_to_index(
client: &Client,
index: &Index,
projects: &[UploadSearchProject],
additional_fields: &[String],
) -> Result<(), IndexingError> {
let mut new_filterable_attributes: Vec<String> = index.get_filterable_attributes().await?;
let mut new_displayed_attributes = index.get_displayed_attributes().await?;
new_filterable_attributes.extend(additional_fields.iter().map(|s| s.to_string()));
new_displayed_attributes.extend(additional_fields.iter().map(|s| s.to_string()));
info!("add attributes.");
index
.set_filterable_attributes(new_filterable_attributes)
.await?;
index
.set_displayed_attributes(new_displayed_attributes)
.await?;
info!("Adding to index.");
add_to_index(client, index, projects).await?;
Ok(())
}
pub async fn add_projects(
indices: &[Index],
projects: Vec<UploadSearchProject>,
additional_fields: Vec<String>,
config: &SearchConfig,
) -> Result<(), IndexingError> {
let client = config.make_client();
for index in indices {
info!("adding projects part1 or 2.");
update_and_add_to_index(&client, index, &projects, &additional_fields).await?;
}
Ok(())
}

View File

@@ -164,8 +164,8 @@ pub struct ResultSearchProject {
pub requested_status: Option<String>,
pub loaders: Vec<String>, // Search uses loaders as categories- this is purely for the Project model.
pub links: Vec<LinkUrl>,
pub games: Vec<String>, // Todo: in future, could be a searchable field.
pub gallery_items: Vec<GalleryItem>, // Gallery *only* urls are stored in gallery, but the gallery items are stored here- required for the Project model.
pub games: Vec<String>, // Todo: in future, could be a searchable field.
pub organization_id: Option<String>, // Todo: in future, could be a searchable field.
#[serde(flatten)]
@@ -237,7 +237,7 @@ pub async fn search_for_project(
if facet.is_array() {
serde_json::from_value::<Vec<String>>(facet).unwrap_or_default()
} else {
vec![serde_json::from_value::<String>(facet.clone())
vec![serde_json::from_value::<String>(facet)
.unwrap_or_default()]
}
})