use crate::database::models::ids::*; use crate::database::models::notification_item::DBNotification; use crate::database::models::notifications_deliveries_item::DBNotificationDelivery; use crate::database::models::notifications_template_item::NotificationTemplate; use crate::database::models::user_item::DBUser; use crate::database::redis::RedisPool; use crate::models::notifications::NotificationBody; use crate::models::v3::notifications::{ NotificationChannel, NotificationDeliveryStatus, }; use crate::routes::ApiError; use chrono::Utc; use futures::stream::{FuturesUnordered, StreamExt}; use lettre::message::Mailbox; use lettre::transport::smtp::authentication::Credentials; use lettre::transport::smtp::client::{Tls, TlsParameters}; use lettre::{AsyncSmtpTransport, AsyncTransport, Tokio1Executor}; use reqwest::Client; use sqlx::PgPool; use std::sync::Arc; use thiserror::Error; use tokio::sync::Mutex as TokioMutex; use tracing::{error, info, instrument, warn}; const EMAIL_RETRY_DELAY_SECONDS: i64 = 10; pub enum Mailer { Uninitialized, Initialized(Arc>), } impl Mailer { pub async fn to_transport( &mut self, ) -> Result>, MailError> { let maybe_transport = match self { Mailer::Uninitialized => { let username = dotenvy::var("SMTP_USERNAME")?; let password = dotenvy::var("SMTP_PASSWORD")?; let host = dotenvy::var("SMTP_HOST")?; let port = dotenvy::var("SMTP_PORT")?.parse::().unwrap_or(465); let creds = (!username.is_empty()) .then(|| Credentials::new(username, password)); let tls_setting = match dotenvy::var("SMTP_TLS")?.as_str() { "none" => Tls::None, "opportunistic_start_tls" => Tls::Opportunistic( TlsParameters::new(host.to_string())?, ), "requires_start_tls" => { Tls::Required(TlsParameters::new(host.to_string())?) } "tls" => { Tls::Wrapper(TlsParameters::new(host.to_string())?) } _ => { warn!( "Unrecognized SMTP TLS setting. Defaulting to TLS." ); Tls::Wrapper(TlsParameters::new(host.to_string())?) } }; let mut mailer = AsyncSmtpTransport::::relay(&host)? .port(port) .tls(tls_setting); if let Some(creds) = creds { mailer = mailer.credentials(creds); } let mailer = mailer.build(); let result = mailer.test_connection().await; match &result { Ok(true) => Some(Arc::new(mailer)), Ok(false) => { error!("SMTP NOOP failed, disabling mailer"); None } Err(error) => { error!(%error, "Failed to test SMTP connection, disabling mailer"); None } } } Mailer::Initialized(transport) => Some(Arc::clone(transport)), }; let transport = maybe_transport.ok_or_else(|| MailError::Uninitialized)?; *self = Mailer::Initialized(Arc::clone(&transport)); Ok(transport) } } #[derive(Error, Debug)] pub enum MailError { #[error("Environment Error")] Env(#[from] dotenvy::Error), #[error("Mail Error: {0}")] Mail(#[from] lettre::error::Error), #[error("Address Parse Error: {0}")] Address(#[from] lettre::address::AddressError), #[error("SMTP Error: {0}")] Smtp(#[from] lettre::transport::smtp::Error), #[error("Couldn't initialize SMTP transport")] Uninitialized, #[error("HTTP error fetching template: {0}")] HttpTemplate(#[from] reqwest::Error), } #[derive(Clone)] pub struct EmailQueue { pg: PgPool, client: reqwest::Client, redis: RedisPool, mailer: Arc>, identity: templates::MailingIdentity, } impl EmailQueue { /// Initializes the email queue from environment variables, and tests the SMTP connection. /// /// # Panic /// /// Panics if a TLS backend cannot be initialized by [`reqwest::ClientBuilder`]. pub fn init(pg: PgPool, redis: RedisPool) -> Result { Ok(Self { pg, redis, mailer: Arc::new(TokioMutex::new(Mailer::Uninitialized)), identity: templates::MailingIdentity::from_env()?, client: Client::builder() .user_agent("Modrinth") .build() .expect("Failed to build HTTP client"), }) } #[instrument(name = "EmailQueue::index", skip_all)] pub async fn index(&self) -> Result<(), ApiError> { let transport = self.mailer.lock().await.to_transport().await?; let begin = std::time::Instant::now(); let mut deliveries = DBNotificationDelivery::lock_channel_processable( NotificationChannel::Email, 50, &self.pg, ) .await?; if deliveries.is_empty() { return Ok(()); } let n_to_process = deliveries.len(); // Auto-fail deliveries which have been attempted over 3 times to avoid // ballooning the error rate. for d in deliveries.iter_mut().filter(|d| d.attempt_count >= 3) { d.status = NotificationDeliveryStatus::PermanentlyFailed; d.update(&self.pg).await?; } // We hold a FOR UPDATE lock on the rows here, so no other workers are accessing them // at the same time. let notification_ids = deliveries .iter() .filter(|d| d.attempt_count < 3) .map(|d| d.notification_id) .collect::>(); let notifications = DBNotification::get_many(¬ification_ids, &self.pg).await?; // For all notifications we collected, fill out the template // and send it via SMTP in parallel. let mut futures = FuturesUnordered::new(); for notification in notifications { let this = self.clone(); let transport = Arc::clone(&transport); futures.push(async move { let mut txn = this.pg.begin().await?; let maybe_user = DBUser::get_id( notification.user_id, &mut *txn, &this.redis, ) .await?; let Some(mailbox) = maybe_user .and_then(|user| user.email) .and_then(|email| email.parse().ok()) else { return Ok(( notification.id, NotificationDeliveryStatus::SkippedPreferences, )); }; this.send_one_with_transport( &mut txn, transport, notification.body, notification.user_id, mailbox, ) .await .map(|status| (notification.id, status)) }); } while let Some(result) = futures.next().await { match result { Ok((notification_id, status)) => { if let Some(idx) = deliveries .iter() .position(|d| d.notification_id == notification_id) { let update_next_attempt = status == NotificationDeliveryStatus::Pending; let mut delivery = deliveries.remove(idx); delivery.status = status; delivery.next_attempt += if update_next_attempt { chrono::Duration::seconds(EMAIL_RETRY_DELAY_SECONDS) } else { chrono::Duration::seconds(0) }; delivery.attempt_count += 1; delivery.update(&self.pg).await?; } } Err(error) => error!(%error, "Error building email"), } } for mut delivery in deliveries { // For these, there was an error building the email, like a // database error. Retry them after a delay. delivery.next_attempt = Utc::now() + chrono::Duration::seconds(EMAIL_RETRY_DELAY_SECONDS); delivery.update(&self.pg).await?; } info!( "Processed {} email deliveries in {}ms", n_to_process, begin.elapsed().as_millis() ); Ok(()) } pub async fn send_one( &self, txn: &mut sqlx::PgTransaction<'_>, notification: NotificationBody, user_id: DBUserId, address: Mailbox, ) -> Result { let transport = self.mailer.lock().await.to_transport().await?; self.send_one_with_transport( txn, transport, notification, user_id, address, ) .await } async fn send_one_with_transport( &self, txn: &mut sqlx::PgTransaction<'_>, transport: Arc>, notification: NotificationBody, user_id: DBUserId, address: Mailbox, ) -> Result { // If there isn't any template present in the database for the // notification type, skip it. let Some(template) = NotificationTemplate::list_channel( NotificationChannel::Email, &mut **txn, &self.redis, ) .await? .into_iter() .find(|t| t.notification_type == notification.notification_type()) else { return Ok(NotificationDeliveryStatus::SkippedDefault); }; let message = templates::build_email( txn, &self.redis, &self.client, user_id, ¬ification, &template, self.identity.clone(), address, ) .await?; let send_result = transport.send(message).await; Ok(send_result.map_or_else(|error| { error!(%error, smtp.code = ?extract_smtp_code(&error), "Error sending email"); if error.is_permanent() { NotificationDeliveryStatus::PermanentlyFailed } else { NotificationDeliveryStatus::Pending } }, |_| NotificationDeliveryStatus::Delivered)) } } fn extract_smtp_code(e: &lettre::transport::smtp::Error) -> Option { e.status().map(|x| x.into()) } mod templates;