Allow many Meilisearch write addrs (#5102)

* Write to many Meilisearch write addrs

* Keep client results ordered

* Attach Read Meilisearch client to actix data

* Load balanced meilisearch Compose profile

* Nginx config (round_robin)

* Fix nginx

* Meilisearch + nginx in same net

* Fix env vars example

* Fix env example again

* Fix env again

* Use try_collect with FuturesOrdered

* maybe fix remove_documents

* Clippy
This commit is contained in:
François-Xavier Talbot
2026-01-14 19:38:09 -05:00
committed by GitHub
parent 7dba9cbe54
commit 3ffa78aa07
9 changed files with 328 additions and 100 deletions

View File

@@ -12,7 +12,8 @@ DATABASE_URL=postgresql://labrinth:labrinth@labrinth-postgres/labrinth
DATABASE_MIN_CONNECTIONS=0
DATABASE_MAX_CONNECTIONS=16
MEILISEARCH_ADDR=http://labrinth-meilisearch:7700
MEILISEARCH_READ_ADDR=http://localhost:7700
MEILISEARCH_WRITE_ADDRS=http://localhost:7700
MEILISEARCH_KEY=modrinth
REDIS_URL=redis://labrinth-redis

View File

@@ -13,7 +13,13 @@ DATABASE_URL=postgresql://labrinth:labrinth@localhost/labrinth
DATABASE_MIN_CONNECTIONS=0
DATABASE_MAX_CONNECTIONS=16
MEILISEARCH_ADDR=http://localhost:7700
MEILISEARCH_READ_ADDR=http://localhost:7700
MEILISEARCH_WRITE_ADDRS=http://localhost:7700
# # For a sharded Meilisearch setup (sharded-meilisearch docker compose profile)
# MEILISEARCH_READ_ADDR=http://localhost:7710
# MEILISEARCH_WRITE_ADDRS=http://localhost:7700,http://localhost:7701
MEILISEARCH_KEY=modrinth
REDIS_URL=redis://localhost

View File

@@ -0,0 +1,14 @@
upstream meilisearch_upstream {
server meilisearch0:7700;
server meilisearch1:7700;
}
server {
listen 80;
location / {
proxy_pass http://meilisearch_upstream;
proxy_set_header Host $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}

View File

@@ -19,6 +19,7 @@ use crate::background_task::update_versions;
use crate::database::ReadOnlyPgPool;
use crate::queue::billing::{index_billing, index_subscriptions};
use crate::queue::moderation::AutomatedModerationQueue;
use crate::search::MeilisearchReadClient;
use crate::util::anrok;
use crate::util::archon::ArchonClient;
use crate::util::env::{parse_strings_from_var, parse_var};
@@ -68,6 +69,7 @@ pub struct LabrinthConfig {
pub email_queue: web::Data<EmailQueue>,
pub archon_client: web::Data<ArchonClient>,
pub gotenberg_client: GotenbergClient,
pub search_read_client: web::Data<MeilisearchReadClient>,
}
#[allow(clippy::too_many_arguments)]
@@ -274,6 +276,11 @@ pub fn app_setup(
file_host,
scheduler: Arc::new(scheduler),
ip_salt,
search_read_client: web::Data::new(
search_config.make_loadbalanced_read_client().expect(
"Failed to make Meilisearch client for read operations",
),
),
search_config,
session_queue,
payouts_queue: web::Data::new(PayoutsQueue::new()),
@@ -325,6 +332,7 @@ pub fn app_config(
.app_data(labrinth_config.archon_client.clone())
.app_data(web::Data::new(labrinth_config.stripe_client.clone()))
.app_data(web::Data::new(labrinth_config.anrok_client.clone()))
.app_data(labrinth_config.search_read_client.clone())
.app_data(labrinth_config.rate_limiter.clone())
.configure({
#[cfg(target_os = "linux")]
@@ -373,7 +381,8 @@ pub fn check_env_vars() -> bool {
failed |= check_var::<String>("LABRINTH_EXTERNAL_NOTIFICATION_KEY");
failed |= check_var::<String>("RATE_LIMIT_IGNORE_KEY");
failed |= check_var::<String>("DATABASE_URL");
failed |= check_var::<String>("MEILISEARCH_ADDR");
failed |= check_var::<String>("MEILISEARCH_READ_ADDR");
failed |= check_var::<String>("MEILISEARCH_WRITE_ADDRS");
failed |= check_var::<String>("MEILISEARCH_KEY");
failed |= check_var::<String>("REDIS_URL");
failed |= check_var::<String>("BIND_ADDR");

View File

@@ -13,7 +13,9 @@ use crate::queue::moderation::AutomatedModerationQueue;
use crate::queue::session::AuthQueue;
use crate::routes::v3::projects::ProjectIds;
use crate::routes::{ApiError, v2_reroute, v3};
use crate::search::{SearchConfig, SearchError, search_for_project};
use crate::search::{
MeilisearchReadClient, SearchConfig, SearchError, search_for_project,
};
use actix_web::{HttpRequest, HttpResponse, delete, get, patch, post, web};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
@@ -54,6 +56,7 @@ pub fn config(cfg: &mut web::ServiceConfig) {
pub async fn project_search(
web::Query(info): web::Query<SearchRequest>,
config: web::Data<SearchConfig>,
read_client: web::Data<MeilisearchReadClient>,
) -> Result<HttpResponse, SearchError> {
// Search now uses loader_fields instead of explicit 'client_side' and 'server_side' fields
// While the backend for this has changed, it doesnt affect much
@@ -99,7 +102,7 @@ pub async fn project_search(
..info
};
let results = search_for_project(&info, &config).await?;
let results = search_for_project(&info, &config, &read_client).await?;
let results = LegacySearchResults::from(results);

View File

@@ -27,7 +27,9 @@ use crate::queue::moderation::AutomatedModerationQueue;
use crate::queue::session::AuthQueue;
use crate::routes::ApiError;
use crate::search::indexing::remove_documents;
use crate::search::{SearchConfig, SearchError, search_for_project};
use crate::search::{
MeilisearchReadClient, SearchConfig, SearchError, search_for_project,
};
use crate::util::img;
use crate::util::img::{delete_old_images, upload_image_optimized};
use crate::util::routes::read_limited_from_payload;
@@ -1037,8 +1039,9 @@ pub async fn edit_project_categories(
pub async fn project_search(
web::Query(info): web::Query<SearchRequest>,
config: web::Data<SearchConfig>,
read_client: web::Data<MeilisearchReadClient>,
) -> Result<HttpResponse, SearchError> {
let results = search_for_project(&info, &config).await?;
let results = search_for_project(&info, &config, &read_client).await?;
// TODO: add this back
// let results = ReturnSearchResults {

View File

@@ -7,14 +7,14 @@ use crate::database::redis::RedisPool;
use crate::search::{SearchConfig, UploadSearchProject};
use ariadne::ids::base62_impl::to_base62;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use futures::stream::FuturesOrdered;
use local_import::index_local;
use meilisearch_sdk::client::{Client, SwapIndexes};
use meilisearch_sdk::indexes::Index;
use meilisearch_sdk::settings::{PaginationSetting, Settings};
use sqlx::postgres::PgPool;
use thiserror::Error;
use tracing::info;
use tracing::{info, trace};
#[derive(Error, Debug)]
pub enum IndexingError {
@@ -43,31 +43,38 @@ pub async fn remove_documents(
config: &SearchConfig,
) -> Result<(), meilisearch_sdk::errors::Error> {
let mut indexes = get_indexes_for_indexing(config, false).await?;
let mut indexes_next = get_indexes_for_indexing(config, true).await?;
indexes.append(&mut indexes_next);
let indexes_next = get_indexes_for_indexing(config, true).await?;
let client = config.make_client()?;
let client = &client;
let mut deletion_tasks = FuturesUnordered::new();
for index in &indexes {
deletion_tasks.push(async move {
// After being successfully submitted, Meilisearch tasks are executed
// asynchronously, so wait some time for them to complete
index
.delete_documents(
&ids.iter().map(|x| to_base62(x.0)).collect::<Vec<_>>(),
)
.await?
.wait_for_completion(
client,
None,
Some(Duration::from_secs(15)),
)
.await
});
for list in &mut indexes {
for alt_list in &indexes_next {
list.extend(alt_list.iter().cloned());
}
}
let client = config.make_batch_client()?;
let client = &client;
let ids_base62 = ids.iter().map(|x| to_base62(x.0)).collect::<Vec<_>>();
let mut deletion_tasks = FuturesOrdered::new();
client.across_all(indexes, |index_list, client| {
for index in index_list {
let owned_client = client.clone();
let ids_base62_ref = &ids_base62;
deletion_tasks.push_back(async move {
index
.delete_documents(ids_base62_ref)
.await?
.wait_for_completion(
&owned_client,
None,
Some(Duration::from_secs(15)),
)
.await
});
}
});
while let Some(result) = deletion_tasks.next().await {
result?;
}
@@ -82,14 +89,20 @@ pub async fn index_projects(
) -> Result<(), IndexingError> {
info!("Indexing projects.");
trace!("Ensuring current indexes exists");
// First, ensure current index exists (so no error happens- current index should be worst-case empty, not missing)
get_indexes_for_indexing(config, false).await?;
trace!("Deleting surplus indexes");
// Then, delete the next index if it still exists
let indices = get_indexes_for_indexing(config, true).await?;
for index in indices {
index.delete().await?;
for client_indices in indices {
for index in client_indices {
index.delete().await?;
}
}
trace!("Recreating next index");
// Recreate the next index for indexing
let indices = get_indexes_for_indexing(config, true).await?;
@@ -103,15 +116,24 @@ pub async fn index_projects(
.collect::<Vec<_>>();
let uploads = index_local(&pool).await?;
add_projects(&indices, uploads, all_loader_fields.clone(), config).await?;
add_projects_batch_client(
&indices,
uploads,
all_loader_fields.clone(),
config,
)
.await?;
// Swap the index
swap_index(config, "projects").await?;
swap_index(config, "projects_filtered").await?;
// Delete the now-old index
for index in indices {
index.delete().await?;
for index_list in indices {
for index in index_list {
index.delete().await?;
}
}
info!("Done adding projects.");
@@ -122,17 +144,24 @@ pub async fn swap_index(
config: &SearchConfig,
index_name: &str,
) -> Result<(), IndexingError> {
let client = config.make_client()?;
let client = config.make_batch_client()?;
let index_name_next = config.get_index_name(index_name, true);
let index_name = config.get_index_name(index_name, false);
let swap_indices = SwapIndexes {
indexes: (index_name_next, index_name),
rename: None,
};
let swap_indices_ref = &swap_indices;
client
.swap_indexes([&swap_indices])
.await?
.wait_for_completion(&client, None, Some(TIMEOUT))
.with_all_clients("swap_indexes", |client| async move {
client
.swap_indexes([swap_indices_ref])
.await?
.wait_for_completion(client, None, Some(TIMEOUT))
.await
})
.await?;
Ok(())
@@ -141,41 +170,52 @@ pub async fn swap_index(
pub async fn get_indexes_for_indexing(
config: &SearchConfig,
next: bool, // Get the 'next' one
) -> Result<Vec<Index>, meilisearch_sdk::errors::Error> {
let client = config.make_client()?;
) -> Result<Vec<Vec<Index>>, meilisearch_sdk::errors::Error> {
let client = config.make_batch_client()?;
let project_name = config.get_index_name("projects", next);
let project_filtered_name =
config.get_index_name("projects_filtered", next);
let projects_index = create_or_update_index(
&client,
&project_name,
Some(&[
"words",
"typo",
"proximity",
"attribute",
"exactness",
"sort",
]),
)
.await?;
let projects_filtered_index = create_or_update_index(
&client,
&project_filtered_name,
Some(&[
"sort",
"words",
"typo",
"proximity",
"attribute",
"exactness",
]),
)
.await?;
Ok(vec![projects_index, projects_filtered_index])
let project_name_ref = &project_name;
let project_filtered_name_ref = &project_filtered_name;
let results = client
.with_all_clients("get_indexes_for_indexing", |client| async move {
let projects_index = create_or_update_index(
client,
project_name_ref,
Some(&[
"words",
"typo",
"proximity",
"attribute",
"exactness",
"sort",
]),
)
.await?;
let projects_filtered_index = create_or_update_index(
client,
project_filtered_name_ref,
Some(&[
"sort",
"words",
"typo",
"proximity",
"attribute",
"exactness",
]),
)
.await?;
Ok(vec![projects_index, projects_filtered_index])
})
.await?;
Ok(results)
}
#[tracing::instrument(skip_all, fields(%name))]
async fn create_or_update_index(
client: &Client,
name: &str,
@@ -302,16 +342,40 @@ async fn update_and_add_to_index(
Ok(())
}
pub async fn add_projects(
indices: &[Index],
pub async fn add_projects_batch_client(
indices: &[Vec<Index>],
projects: Vec<UploadSearchProject>,
additional_fields: Vec<String>,
config: &SearchConfig,
) -> Result<(), IndexingError> {
let client = config.make_client()?;
for index in indices {
update_and_add_to_index(&client, index, &projects, &additional_fields)
.await?;
let client = config.make_batch_client()?;
let index_references = indices
.iter()
.map(|x| x.iter().collect())
.collect::<Vec<Vec<&Index>>>();
let mut tasks = FuturesOrdered::new();
client.across_all(index_references, |index_list, client| {
for index in index_list {
let owned_client = client.clone();
let projects_ref = &projects;
let additional_fields_ref = &additional_fields;
tasks.push_back(async move {
update_and_add_to_index(
&owned_client,
index,
projects_ref,
additional_fields_ref,
)
.await
});
}
});
while let Some(result) = tasks.next().await {
result?;
}
Ok(())

View File

@@ -3,6 +3,8 @@ use crate::models::projects::SearchRequest;
use actix_web::HttpResponse;
use actix_web::http::StatusCode;
use chrono::{DateTime, Utc};
use futures::TryStreamExt;
use futures::stream::FuturesOrdered;
use itertools::Itertools;
use meilisearch_sdk::client::Client;
use serde::{Deserialize, Serialize};
@@ -11,6 +13,7 @@ use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::Write;
use thiserror::Error;
use tracing::{Instrument, info_span};
pub mod indexing;
@@ -58,9 +61,71 @@ impl actix_web::ResponseError for SearchError {
}
}
#[derive(Debug, Clone)]
pub struct MeilisearchReadClient {
pub client: Client,
}
impl std::ops::Deref for MeilisearchReadClient {
type Target = Client;
fn deref(&self) -> &Self::Target {
&self.client
}
}
pub struct BatchClient {
pub clients: Vec<Client>,
}
impl BatchClient {
pub fn new(clients: Vec<Client>) -> Self {
Self { clients }
}
pub async fn with_all_clients<'a, T, G, Fut>(
&'a self,
task_name: &str,
generator: G,
) -> Result<Vec<T>, meilisearch_sdk::errors::Error>
where
G: Fn(&'a Client) -> Fut,
Fut: Future<Output = Result<T, meilisearch_sdk::errors::Error>> + 'a,
{
let mut tasks = FuturesOrdered::new();
for (idx, client) in self.clients.iter().enumerate() {
tasks.push_back(generator(client).instrument(info_span!(
"client_task",
task.name = task_name,
client.idx = idx,
)));
}
let results = tasks.try_collect::<Vec<T>>().await?;
Ok(results)
}
pub fn across_all<T, F, R>(&self, data: Vec<T>, mut predicate: F) -> Vec<R>
where
F: FnMut(T, &Client) -> R,
{
assert_eq!(
data.len(),
self.clients.len(),
"mismatch between data len and meilisearch client count"
);
self.clients
.iter()
.zip(data)
.map(|(client, item)| predicate(item, client))
.collect()
}
}
#[derive(Debug, Clone)]
pub struct SearchConfig {
pub address: String,
pub addresses: Vec<String>,
pub read_lb_address: String,
pub key: String,
pub meta_namespace: String,
}
@@ -69,22 +134,48 @@ impl SearchConfig {
// Panics if the environment variables are not set,
// but these are already checked for on startup.
pub fn new(meta_namespace: Option<String>) -> Self {
let address =
dotenvy::var("MEILISEARCH_ADDR").expect("MEILISEARCH_ADDR not set");
let address_many = dotenvy::var("MEILISEARCH_WRITE_ADDRS")
.expect("MEILISEARCH_WRITE_ADDRS not set");
let read_lb_address = dotenvy::var("MEILISEARCH_READ_ADDR")
.expect("MEILISEARCH_READ_ADDR not set");
let addresses = address_many
.split(',')
.filter(|s| !s.trim().is_empty())
.map(|s| s.to_string())
.collect::<Vec<String>>();
let key =
dotenvy::var("MEILISEARCH_KEY").expect("MEILISEARCH_KEY not set");
Self {
address,
addresses,
key,
meta_namespace: meta_namespace.unwrap_or_default(),
read_lb_address,
}
}
pub fn make_client(
pub fn make_loadbalanced_read_client(
&self,
) -> Result<Client, meilisearch_sdk::errors::Error> {
Client::new(self.address.as_str(), Some(self.key.as_str()))
) -> Result<MeilisearchReadClient, meilisearch_sdk::errors::Error> {
Ok(MeilisearchReadClient {
client: Client::new(&self.read_lb_address, Some(&self.key))?,
})
}
pub fn make_batch_client(
&self,
) -> Result<BatchClient, meilisearch_sdk::errors::Error> {
Ok(BatchClient::new(
self.addresses
.iter()
.map(|address| {
Client::new(address.as_str(), Some(self.key.as_str()))
})
.collect::<Result<Vec<_>, _>>()?,
))
}
// Next: true if we want the next index (we are preparing the next swap), false if we want the current index (searching)
@@ -192,9 +283,8 @@ pub fn get_sort_index(
pub async fn search_for_project(
info: &SearchRequest,
config: &SearchConfig,
client: &MeilisearchReadClient,
) -> Result<SearchResults, SearchError> {
let client = Client::new(&*config.address, Some(&*config.key))?;
let offset: usize = info.offset.as_deref().unwrap_or("0").parse()?;
let index = info.index.as_deref().unwrap_or("relevance");
let limit = info

View File

@@ -13,13 +13,15 @@ services:
POSTGRES_PASSWORD: labrinth
POSTGRES_HOST_AUTH_METHOD: trust
healthcheck:
test: ['CMD', 'pg_isready', '-U', 'labrinth']
test: [ 'CMD', 'pg_isready', '-U', 'labrinth' ]
interval: 3s
timeout: 5s
retries: 3
meilisearch:
meilisearch0:
image: getmeili/meilisearch:v1.12.0
container_name: labrinth-meilisearch
container_name: labrinth-meilisearch0
networks:
- meilisearch-mesh
restart: on-failure
ports:
- '127.0.0.1:7700:7700'
@@ -30,7 +32,7 @@ services:
MEILI_HTTP_PAYLOAD_SIZE_LIMIT: 107374182400
MEILI_LOG_LEVEL: warn
healthcheck:
test: ['CMD', 'curl', '--fail', 'http://localhost:7700/health']
test: [ 'CMD', 'curl', '--fail', 'http://localhost:7700/health' ]
interval: 3s
timeout: 5s
retries: 3
@@ -43,7 +45,7 @@ services:
volumes:
- redis-data:/data
healthcheck:
test: ['CMD', 'redis-cli', 'PING']
test: [ 'CMD', 'redis-cli', 'PING' ]
interval: 3s
timeout: 5s
retries: 3
@@ -56,7 +58,7 @@ services:
CLICKHOUSE_USER: default
CLICKHOUSE_PASSWORD: default
healthcheck:
test: ['CMD-SHELL', 'clickhouse-client --query "SELECT 1"']
test: [ 'CMD-SHELL', 'clickhouse-client --query "SELECT 1"' ]
interval: 3s
timeout: 5s
retries: 3
@@ -69,14 +71,7 @@ services:
environment:
MP_ENABLE_SPAMASSASSIN: postmark
healthcheck:
test:
[
'CMD',
'wget',
'-q',
'-O/dev/null',
'http://localhost:8025/api/v1/info',
]
test: [ 'CMD', 'wget', '-q', '-O/dev/null', 'http://localhost:8025/api/v1/info' ]
interval: 3s
timeout: 5s
retries: 3
@@ -127,8 +122,7 @@ services:
LABRINTH_ENDPOINT: http://host.docker.internal:8000/_internal/delphi/ingest
LABRINTH_ADMIN_KEY: feedbeef
healthcheck:
test:
['CMD', 'wget', '-q', '-O/dev/null', 'http://localhost:59999/health']
test: [ 'CMD', 'wget', '-q', '-O/dev/null', 'http://localhost:59999/health' ]
interval: 3s
timeout: 5s
retries: 3
@@ -140,8 +134,52 @@ services:
# Delphi must send a message on a webhook to our backend,
# so it must have access to our local network
- 'host.docker.internal:host-gateway'
# Sharded Meilisearch
meilisearch1:
profiles:
- sharded-meilisearch
image: getmeili/meilisearch:v1.12.0
container_name: labrinth-meilisearch1
restart: on-failure
networks:
- meilisearch-mesh
ports:
- '127.0.0.1:7701:7700'
volumes:
- meilisearch1-data:/data.ms
environment:
MEILI_MASTER_KEY: modrinth
MEILI_HTTP_PAYLOAD_SIZE_LIMIT: 107374182400
MEILI_LOG_LEVEL: warn
healthcheck:
test: [ 'CMD', 'curl', '--fail', 'http://localhost:7700/health' ]
interval: 3s
timeout: 5s
retries: 3
nginx-meilisearch-lb:
profiles:
- sharded-meilisearch
image: nginx:alpine
container_name: labrinth-meili-lb
networks:
- meilisearch-mesh
depends_on:
meilisearch0:
condition: service_healthy
meilisearch1:
condition: service_healthy
ports:
- '127.0.0.1:7710:80'
volumes:
- ./apps/labrinth/nginx/meili-lb.conf:/etc/nginx/conf.d/default.conf:ro
networks:
meilisearch-mesh:
driver: bridge
volumes:
meilisearch-data:
meilisearch1-data:
db-data:
redis-data:
labrinth-cdn-data: