diff --git a/apps/labrinth/src/background_task.rs b/apps/labrinth/src/background_task.rs index 2da7ba9b0..f78ed334d 100644 --- a/apps/labrinth/src/background_task.rs +++ b/apps/labrinth/src/background_task.rs @@ -64,8 +64,26 @@ impl BackgroundTask { } pub async fn run_email(email_queue: EmailQueue) { - if let Err(error) = email_queue.index().await { - error!(%error, "Failed to index email queue"); + // Only index for 5 emails at a time, to reduce transaction length, + // for a total of 100 emails. + for _ in 0..20 { + let then = std::time::Instant::now(); + + match email_queue.index(5).await { + Ok(true) => { + info!( + "Indexed email queue in {}ms", + then.elapsed().as_millis() + ); + } + Ok(false) => { + info!("No more emails to index"); + break; + } + Err(error) => { + error!(%error, "Failed to index email queue"); + } + } } } diff --git a/apps/labrinth/src/queue/email.rs b/apps/labrinth/src/queue/email.rs index e249af0c9..5336cf289 100644 --- a/apps/labrinth/src/queue/email.rs +++ b/apps/labrinth/src/queue/email.rs @@ -142,21 +142,27 @@ impl EmailQueue { }) } + /// Works on the email queue for up to `limit` items. + /// + /// Don't use a value too large for `limit`, as this method uses a single long running transaction to hold locks + /// on the deliveries. Something around 5 is good. + /// + /// Returns `Ok(false)` if no emails were processed, `Ok(true)` if some were processed. #[instrument(name = "EmailQueue::index", skip_all)] - pub async fn index(&self) -> Result<(), ApiError> { + pub async fn index(&self, limit: i64) -> Result { let transport = self.mailer.lock().await.to_transport().await?; let begin = std::time::Instant::now(); let mut deliveries = DBNotificationDelivery::lock_channel_processable( NotificationChannel::Email, - 50, + limit, &self.pg, ) .await?; if deliveries.is_empty() { - return Ok(()); + return Ok(false); } let n_to_process = deliveries.len(); @@ -263,7 +269,7 @@ impl EmailQueue { begin.elapsed().as_millis() ); - Ok(()) + Ok(true) } pub async fn send_one(