Chunking searches (#787)

* new attempt

* revised searching CTEs

* prepare fix

* fix tests

* fixes

* restructured project_item to use queries

* search changes! fmt clippy prepare

* small changes
This commit is contained in:
Wyatt Verchere
2023-12-08 22:14:17 -08:00
committed by GitHub
parent 945e5a2dc3
commit 235f4f10ef
28 changed files with 1784 additions and 1026 deletions

View File

@@ -12,14 +12,10 @@ use crate::models;
use crate::search::UploadSearchProject;
use sqlx::postgres::PgPool;
pub async fn index_local(
pub async fn get_all_ids(
pool: PgPool,
redis: &RedisPool,
) -> Result<(Vec<UploadSearchProject>, Vec<String>), IndexingError> {
info!("Indexing local projects!");
let loader_field_keys: Arc<DashSet<String>> = Arc::new(DashSet::new());
let all_visible_ids: HashMap<VersionId, (ProjectId, String)> = sqlx::query!(
) -> Result<Vec<(VersionId, ProjectId, String)>, IndexingError> {
let all_visible_ids: Vec<(VersionId, ProjectId, String)> = sqlx::query!(
"
SELECT v.id id, m.id mod_id, u.username owner_username
@@ -45,33 +41,48 @@ pub async fn index_local(
Ok(e.right().map(|m| {
let project_id: ProjectId = ProjectId(m.mod_id);
let version_id: VersionId = VersionId(m.id);
(version_id, (project_id, m.owner_username))
(version_id, project_id, m.owner_username)
}))
})
.try_collect::<HashMap<_, _>>()
.try_collect::<Vec<_>>()
.await?;
let project_ids = all_visible_ids
Ok(all_visible_ids)
}
pub async fn index_local(
pool: &PgPool,
redis: &RedisPool,
visible_ids: HashMap<VersionId, (ProjectId, String)>,
) -> Result<(Vec<UploadSearchProject>, Vec<String>), IndexingError> {
info!("Indexing local projects!");
let loader_field_keys: Arc<DashSet<String>> = Arc::new(DashSet::new());
let project_ids = visible_ids
.values()
.map(|(project_id, _)| project_id)
.cloned()
.collect::<Vec<_>>();
let projects: HashMap<_, _> = project_item::Project::get_many_ids(&project_ids, &pool, redis)
let projects: HashMap<_, _> = project_item::Project::get_many_ids(&project_ids, pool, redis)
.await?
.into_iter()
.map(|p| (p.inner.id, p))
.collect();
let version_ids = all_visible_ids.keys().cloned().collect::<Vec<_>>();
let versions: HashMap<_, _> = version_item::Version::get_many(&version_ids, &pool, redis)
info!("Fetched local projects!");
let version_ids = visible_ids.keys().cloned().collect::<Vec<_>>();
let versions: HashMap<_, _> = version_item::Version::get_many(&version_ids, pool, redis)
.await?
.into_iter()
.map(|v| (v.inner.id, v))
.collect();
info!("Fetched local versions!");
let mut uploads = Vec::new();
// TODO: could possibly clone less here?
for (version_id, (project_id, owner_username)) in all_visible_ids {
for (version_id, (project_id, owner_username)) in visible_ids {
let m = projects.get(&project_id);
let v = versions.get(&version_id);

View File

@@ -1,15 +1,21 @@
/// This module is used for the indexing from any source.
pub mod local_import;
use std::collections::HashMap;
use crate::database::redis::RedisPool;
use crate::search::{SearchConfig, UploadSearchProject};
use itertools::Itertools;
use local_import::index_local;
use log::info;
use meilisearch_sdk::client::Client;
use meilisearch_sdk::indexes::Index;
use meilisearch_sdk::settings::{PaginationSetting, Settings};
use sqlx::postgres::PgPool;
use thiserror::Error;
use self::local_import::get_all_ids;
#[derive(Error, Debug)]
pub enum IndexingError {
#[error("Error while connecting to the MeiliSearch database")]
@@ -31,6 +37,7 @@ pub enum IndexingError {
// assumes a max average size of 1KiB per project to avoid this cap.
const MEILISEARCH_CHUNK_SIZE: usize = 10000;
const FETCH_PROJECT_SIZE: usize = 5000;
pub async fn index_projects(
pool: PgPool,
redis: RedisPool,
@@ -39,10 +46,40 @@ pub async fn index_projects(
let mut docs_to_add: Vec<UploadSearchProject> = vec![];
let mut additional_fields: Vec<String> = vec![];
let (mut uploads, mut loader_fields) = index_local(pool.clone(), &redis).await?;
docs_to_add.append(&mut uploads);
additional_fields.append(&mut loader_fields);
let all_ids = get_all_ids(pool.clone()).await?;
let all_ids_len = all_ids.len();
info!("Got all ids, indexing {} projects", all_ids_len);
let mut so_far = 0;
let as_chunks: Vec<_> = all_ids
.into_iter()
.chunks(FETCH_PROJECT_SIZE)
.into_iter()
.map(|x| x.collect::<Vec<_>>())
.collect();
for id_chunk in as_chunks {
info!(
"Fetching chunk {}-{}/{}, size: {}",
so_far,
so_far + FETCH_PROJECT_SIZE,
all_ids_len,
id_chunk.len()
);
so_far += FETCH_PROJECT_SIZE;
let id_chunk = id_chunk
.into_iter()
.map(|(version_id, project_id, owner_username)| {
(version_id, (project_id, owner_username.to_lowercase()))
})
.collect::<HashMap<_, _>>();
let (mut uploads, mut loader_fields) = index_local(&pool, &redis, id_chunk).await?;
docs_to_add.append(&mut uploads);
additional_fields.append(&mut loader_fields);
}
info!("Got all ids, indexing...");
// Write Indices
add_projects(docs_to_add, additional_fields, config).await?;