diff --git a/apps/labrinth/src/database/models/notifications_template_item.rs b/apps/labrinth/src/database/models/notifications_template_item.rs index c4687dfa..b75fe770 100644 --- a/apps/labrinth/src/database/models/notifications_template_item.rs +++ b/apps/labrinth/src/database/models/notifications_template_item.rs @@ -1,14 +1,18 @@ use crate::database::models::DatabaseError; use crate::database::redis::RedisPool; use crate::models::v3::notifications::{NotificationChannel, NotificationType}; +use crate::routes::ApiError; use serde::{Deserialize, Serialize}; const TEMPLATES_NAMESPACE: &str = "notifications_templates"; const TEMPLATES_HTML_DATA_NAMESPACE: &str = "notifications_templates_html_data"; +const TEMPLATES_DYNAMIC_HTML_NAMESPACE: &str = + "notifications_templates_dynamic_html"; + const HTML_DATA_CACHE_EXPIRY: i64 = 60 * 15; // 15 minutes +const TEMPLATES_CACHE_EXPIRY: i64 = 60 * 30; // 30 minutes #[derive(Debug, Clone, Serialize, Deserialize)] - pub struct NotificationTemplate { pub id: i64, pub channel: NotificationChannel, @@ -75,7 +79,7 @@ impl NotificationTemplate { TEMPLATES_NAMESPACE, channel.as_str(), &templates, - None, + Some(TEMPLATES_CACHE_EXPIRY), ) .await?; @@ -111,3 +115,44 @@ impl NotificationTemplate { .await } } + +pub async fn get_or_set_cached_dynamic_html( + redis: &RedisPool, + key: &str, + get: impl FnOnce() -> F, +) -> Result +where + F: Future>, +{ + #[derive(Debug, Clone, Serialize, Deserialize)] + struct HtmlBody { + html: String, + } + + let mut redis_conn = redis.connect().await?; + if let Some(body) = redis_conn + .get_deserialized_from_json::( + TEMPLATES_DYNAMIC_HTML_NAMESPACE, + key, + ) + .await? + { + return Ok(body.html); + } + + drop(redis_conn); + + let cached = HtmlBody { html: get().await? }; + let mut redis_conn = redis.connect().await?; + + redis_conn + .set_serialized_to_json( + TEMPLATES_DYNAMIC_HTML_NAMESPACE, + key, + &cached, + Some(HTML_DATA_CACHE_EXPIRY), + ) + .await?; + + Ok(cached.html) +} diff --git a/apps/labrinth/src/models/v2/notifications.rs b/apps/labrinth/src/models/v2/notifications.rs index 6ed53017..8619af06 100644 --- a/apps/labrinth/src/models/v2/notifications.rs +++ b/apps/labrinth/src/models/v2/notifications.rs @@ -139,6 +139,11 @@ pub enum LegacyNotificationBody { amount: u64, date_available: DateTime, }, + Custom { + key: String, + title: String, + body_md: String, + }, Unknown, } @@ -217,6 +222,7 @@ impl LegacyNotification { NotificationBody::PayoutAvailable { .. } => { Some("payout_available".to_string()) } + NotificationBody::Custom { .. } => Some("custom".to_string()), NotificationBody::LegacyMarkdown { notification_type, .. } => notification_type.clone(), @@ -378,6 +384,15 @@ impl LegacyNotification { service, currency, }, + NotificationBody::Custom { + title, + body_md, + key, + } => LegacyNotificationBody::Custom { + title, + body_md, + key, + }, NotificationBody::PaymentFailed { amount, service } => { LegacyNotificationBody::PaymentFailed { amount, service } } diff --git a/apps/labrinth/src/models/v3/notifications.rs b/apps/labrinth/src/models/v3/notifications.rs index 137f1585..118fe826 100644 --- a/apps/labrinth/src/models/v3/notifications.rs +++ b/apps/labrinth/src/models/v3/notifications.rs @@ -56,6 +56,7 @@ pub enum NotificationType { ProjectStatusNeutral, ProjectTransferred, PayoutAvailable, + Custom, Unknown, } @@ -89,6 +90,7 @@ impl NotificationType { NotificationType::ProjectStatusApproved => { "project_status_approved" } + NotificationType::Custom => "custom", NotificationType::ProjectStatusNeutral => "project_status_neutral", NotificationType::ProjectTransferred => "project_transferred", NotificationType::Unknown => "unknown", @@ -125,6 +127,7 @@ impl NotificationType { } "project_status_neutral" => NotificationType::ProjectStatusNeutral, "project_transferred" => NotificationType::ProjectTransferred, + "custom" => NotificationType::Custom, "unknown" => NotificationType::Unknown, _ => NotificationType::Unknown, } @@ -236,6 +239,11 @@ pub enum NotificationBody { date_available: DateTime, amount: u64, }, + Custom { + key: String, + title: String, + body_md: String, + }, Unknown, } @@ -313,6 +321,7 @@ impl NotificationBody { NotificationBody::PayoutAvailable { .. } => { NotificationType::PayoutAvailable } + NotificationBody::Custom { .. } => NotificationType::Custom, NotificationBody::Unknown => NotificationType::Unknown, } } @@ -557,6 +566,12 @@ impl From for Notification { "#".to_string(), vec![], ), + NotificationBody::Custom { title, .. } => ( + "Notification".to_string(), + title.clone(), + "#".to_string(), + vec![], + ), NotificationBody::Unknown => { ("".to_string(), "".to_string(), "#".to_string(), vec![]) } diff --git a/apps/labrinth/src/queue/email.rs b/apps/labrinth/src/queue/email.rs index 0259918a..099eb278 100644 --- a/apps/labrinth/src/queue/email.rs +++ b/apps/labrinth/src/queue/email.rs @@ -4,7 +4,7 @@ use crate::database::models::notifications_deliveries_item::DBNotificationDelive use crate::database::models::notifications_template_item::NotificationTemplate; use crate::database::models::user_item::DBUser; use crate::database::redis::RedisPool; -use crate::models::notifications::NotificationBody; +use crate::models::notifications::{NotificationBody, NotificationType}; use crate::models::v3::notifications::{ NotificationChannel, NotificationDeliveryStatus, }; @@ -20,6 +20,7 @@ use sqlx::PgPool; use std::sync::Arc; use thiserror::Error; use tokio::sync::Mutex as TokioMutex; +use tokio::sync::Semaphore; use tracing::{error, info, instrument, warn}; const EMAIL_RETRY_DELAY_SECONDS: i64 = 10; @@ -187,13 +188,19 @@ impl EmailQueue { // For all notifications we collected, fill out the template // and send it via SMTP in parallel. - let mut futures = FuturesUnordered::new(); + // Some email notifications should still be processed sequentially. This is to avoid cache stampede in the + // case that processing the email can be heavy. For example, custom emails always make a POST request to modrinth.com, + // which, while not necessarily slow, is subject to heavy rate limiting. + let sequential_processing = Arc::new(Semaphore::new(1)); + for notification in notifications { let this = self.clone(); let transport = Arc::clone(&transport); + let seq = Arc::clone(&sequential_processing); + futures.push(async move { let mut txn = this.pg.begin().await?; @@ -214,15 +221,35 @@ impl EmailQueue { )); }; - this.send_one_with_transport( - &mut txn, - transport, - notification.body, - notification.user_id, - mailbox, - ) - .await - .map(|status| (notification.id, status)) + // For the cache stampede reasons mentioned above, we process custom emails exclusively sequentially. + // This could cause unnecessary slowness if we're sending a lot of custom emails with the same key in one go, + // and the cache is already populated (thus the sequential processing would not be needed). + let maybe_permit = if notification.body.notification_type() + == NotificationType::Custom + { + Some( + seq.acquire() + .await + .expect("Semaphore should never be closed"), + ) + } else { + None + }; + + let result = this + .send_one_with_transport( + &mut txn, + transport, + notification.body, + notification.user_id, + mailbox, + ) + .await + .map(|status| (notification.id, status)); + + drop(maybe_permit); + + result }); } diff --git a/apps/labrinth/src/queue/email/templates.rs b/apps/labrinth/src/queue/email/templates.rs index 02a99078..2a13baac 100644 --- a/apps/labrinth/src/queue/email/templates.rs +++ b/apps/labrinth/src/queue/email/templates.rs @@ -1,12 +1,15 @@ use super::MailError; use crate::database::models::ids::*; -use crate::database::models::notifications_template_item::NotificationTemplate; +use crate::database::models::notifications_template_item::{ + NotificationTemplate, get_or_set_cached_dynamic_html, +}; use crate::database::models::{ DBOrganization, DBProject, DBUser, DatabaseError, }; use crate::database::redis::RedisPool; use crate::models::v3::notifications::NotificationBody; use crate::routes::ApiError; +use crate::util::error::Context; use ariadne::ids::base62_impl::to_base62; use futures::TryFutureExt; use lettre::Message; @@ -138,12 +141,21 @@ pub async fn build_email( reply_address, } = from; - let (html_body_result, mut variables) = futures::try_join!( - get_html_body, - collect_template_variables(exec, redis, user_id, body) - )?; + let db_user = DBUser::get_id(user_id, &mut **exec, redis) + .await? + .ok_or(DatabaseError::Database(sqlx::Error::RowNotFound))?; - variables.insert(USER_EMAIL, to.email.to_string()); + let map = [ + (USER_NAME, db_user.username), + (USER_EMAIL, to.email.to_string()), + ] + .into_iter() + .collect(); + + let (html_body_result, either) = futures::try_join!( + get_html_body, + collect_template_variables(exec, redis, user_id, body, map) + )?; let mut message_builder = Message::builder().from(Mailbox::new( Some(from_name), @@ -157,28 +169,79 @@ pub async fn build_email( )); } - let subject = fill_template(&template.subject_line, &variables); - message_builder = message_builder.to(to).subject(subject); + struct Body { + plaintext: Option, + html: Option, + } - let plaintext_filled_body = - fill_template(&template.plaintext_fallback, &variables); + let (body, subject) = match either { + EmailTemplate::Static(variables) => { + let plaintext_filled_body = + fill_template(&template.plaintext_fallback, &variables); - let email_message = match html_body_result { - Ok(html_body) => { - let html_filled_body = fill_template(&html_body, &variables); - message_builder - .multipart(MultiPart::alternative_plain_html( - plaintext_filled_body, - html_filled_body, - )) - .map_err(MailError::from)? + let email_body = Body { + plaintext: Some(plaintext_filled_body), + html: match html_body_result { + Ok(html_body) => { + Some(fill_template(&html_body, &variables)) + } + Err(error) => { + error!(%error, "Failed to fetch template body"); + None + } + }, + }; + + let subject = fill_template(&template.subject_line, &variables); + + (email_body, subject) } - Err(error) => { - error!(%error, "Failed to fetch template body"); - message_builder - .singlepart(SinglePart::plain(plaintext_filled_body)) - .map_err(MailError::from)? + EmailTemplate::Dynamic { + variables, + body, + title, + } => { + let body = Body { + plaintext: None, + html: Some(fill_template(&body, &variables)), + }; + + (body, fill_template(&title, &variables)) + } + }; + + message_builder = message_builder.to(to).subject(subject); + + let email_message = match body { + Body { + plaintext: Some(plaintext), + html: Some(html), + } => message_builder + .multipart(MultiPart::alternative_plain_html(plaintext, html)) + .map_err(MailError::from)?, + + Body { + plaintext: Some(plaintext), + html: None, + } => message_builder + .singlepart(SinglePart::plain(plaintext)) + .map_err(MailError::from)?, + + Body { + plaintext: None, + html: Some(html), + } => message_builder + .singlepart(SinglePart::html(html)) + .map_err(MailError::from)?, + + Body { + plaintext: None, + html: None, + } => { + return Err(ApiError::Internal(eyre::eyre!( + "Neither HTML or plaintext could be generated" + ))); } }; @@ -218,23 +281,26 @@ fn fill_template( buffer } +enum EmailTemplate { + Static(HashMap<&'static str, String>), + Dynamic { + variables: HashMap<&'static str, String>, + body: String, + title: String, + }, +} + async fn collect_template_variables( exec: &mut sqlx::PgTransaction<'_>, redis: &RedisPool, user_id: DBUserId, n: &NotificationBody, -) -> Result, ApiError> { - let db_user = DBUser::get_id(user_id, &mut **exec, redis) - .await? - .ok_or_else(|| DatabaseError::Database(sqlx::Error::RowNotFound))?; - - let mut map = HashMap::new(); - map.insert(USER_NAME, db_user.username); - + mut map: HashMap<&'static str, String>, +) -> Result { match &n { NotificationBody::PatCreated { token_name } => { map.insert(NEWPAT_TOKEN_NAME, token_name.clone()); - Ok(map) + Ok(EmailTemplate::Static(map)) } NotificationBody::ModerationMessageReceived { project_id, .. } => { @@ -250,7 +316,7 @@ async fn collect_template_variables( map.insert(PROJECT_ID, to_base62(project_id.0)); map.insert(PROJECT_NAME, result.name); map.insert(PROJECT_ICON_URL, result.icon_url.unwrap_or_default()); - Ok(map) + Ok(EmailTemplate::Static(map)) } NotificationBody::ReportStatusUpdated { report_id } => { @@ -273,7 +339,7 @@ async fn collect_template_variables( map.insert(REPORT_ID, to_base62(report_id.0)); map.insert(REPORT_TITLE, result.title); map.insert(REPORT_DATE, date_human_readable(result.created)); - Ok(map) + Ok(EmailTemplate::Static(map)) } NotificationBody::ReportSubmitted { report_id } => { @@ -294,7 +360,7 @@ async fn collect_template_variables( map.insert(REPORT_TITLE, result.title); map.insert(NEWREPORT_ID, to_base62(report_id.0)); - Ok(map) + Ok(EmailTemplate::Static(map)) } NotificationBody::ProjectStatusApproved { project_id } => { @@ -310,7 +376,7 @@ async fn collect_template_variables( map.insert(PROJECT_ID, to_base62(project_id.0)); map.insert(PROJECT_NAME, result.name); map.insert(PROJECT_ICON_URL, result.icon_url.unwrap_or_default()); - Ok(map) + Ok(EmailTemplate::Static(map)) } NotificationBody::ProjectStatusNeutral { @@ -332,7 +398,7 @@ async fn collect_template_variables( map.insert(PROJECT_ICON_URL, result.icon_url.unwrap_or_default()); map.insert(PROJECT_OLD_STATUS, old_status.as_str().to_string()); map.insert(PROJECT_NEW_STATUS, new_status.as_str().to_string()); - Ok(map) + Ok(EmailTemplate::Static(map)) } NotificationBody::ProjectTransferred { @@ -388,7 +454,7 @@ async fn collect_template_variables( map.insert(NEWOWNER_NAME, org.name); } - Ok(map) + Ok(EmailTemplate::Static(map)) } NotificationBody::TeamInvite { team_id: _, @@ -419,7 +485,7 @@ async fn collect_template_variables( map.insert(TEAMINVITE_PROJECT_NAME, result.project_name); map.insert(TEAMINVITE_ROLE_NAME, role.clone()); - Ok(map) + Ok(EmailTemplate::Static(map)) } NotificationBody::OrganizationInvite { @@ -451,7 +517,7 @@ async fn collect_template_variables( map.insert(ORGINVITE_ORG_NAME, result.organization_name); map.insert(ORGINVITE_ROLE_NAME, role.clone()); - Ok(map) + Ok(EmailTemplate::Static(map)) } NotificationBody::StatusChange { @@ -479,7 +545,7 @@ async fn collect_template_variables( map.insert(STATUSCHANGE_OLD_STATUS, old_status.as_str().to_owned()); map.insert(STATUSCHANGE_NEW_STATUS, new_status.as_str().to_owned()); - Ok(map) + Ok(EmailTemplate::Static(map)) } NotificationBody::ResetPassword { flow } => { @@ -492,7 +558,7 @@ async fn collect_template_variables( map.insert(RESETPASSWORD_URL, url); - Ok(map) + Ok(EmailTemplate::Static(map)) } NotificationBody::VerifyEmail { flow } => { @@ -505,20 +571,20 @@ async fn collect_template_variables( map.insert(VERIFYEMAIL_URL, url); - Ok(map) + Ok(EmailTemplate::Static(map)) } NotificationBody::AuthProviderAdded { provider } | NotificationBody::AuthProviderRemoved { provider } => { map.insert(AUTHPROVIDER_NAME, provider.clone()); - Ok(map) + Ok(EmailTemplate::Static(map)) } NotificationBody::TwoFactorEnabled | NotificationBody::TwoFactorRemoved | NotificationBody::PasswordChanged - | NotificationBody::PasswordRemoved => Ok(map), + | NotificationBody::PasswordRemoved => Ok(EmailTemplate::Static(map)), NotificationBody::EmailChanged { new_email, @@ -526,7 +592,7 @@ async fn collect_template_variables( } => { map.insert(EMAILCHANGED_NEW_EMAIL, new_email.clone()); - Ok(map) + Ok(EmailTemplate::Static(map)) } NotificationBody::PaymentFailed { amount, service } => { @@ -541,7 +607,7 @@ async fn collect_template_variables( map.insert(PAYMENTFAILED_SERVICE, service.clone()); map.insert(BILLING_URL, url); - Ok(map) + Ok(EmailTemplate::Static(map)) } NotificationBody::PayoutAvailable { @@ -562,7 +628,7 @@ async fn collect_template_variables( format!("USD${:.2}", *amount as f64 / 100.0), ); - Ok(map) + Ok(EmailTemplate::Static(map)) } NotificationBody::TaxNotification { @@ -607,16 +673,59 @@ async fn collect_template_variables( map.insert(TAXNOTIFICATION_DUE, date_human_readable(*due)); map.insert(TAXNOTIFICATION_SERVICE, service.clone()); map.insert(SUBSCRIPTION_ID, to_base62(subscription_id.0)); - Ok(map) + Ok(EmailTemplate::Static(map)) } + NotificationBody::Custom { + title, + body_md, + key, + } => Ok(EmailTemplate::Dynamic { + variables: map, + body: dynamic_email_body(redis, title, body_md, key).await?, + title: title.to_string(), + }), + NotificationBody::ProjectUpdate { .. } | NotificationBody::ModeratorMessage { .. } | NotificationBody::LegacyMarkdown { .. } - | NotificationBody::Unknown => Ok(map), + | NotificationBody::Unknown => Ok(EmailTemplate::Static(map)), } } +async fn dynamic_email_body( + redis: &RedisPool, + title: &str, + body_md: &str, + key: &str, +) -> Result { + get_or_set_cached_dynamic_html(redis, key, || async { + let site_url = dotenvy::var("SITE_URL") + .wrap_internal_err("SITE_URL is not set")?; + let site_url = site_url.trim_end_matches('/'); + + let url = format!("{}/_internal/templates/email/dynamic", site_url); + + std::str::from_utf8( + reqwest::Client::new() + .post(url) + .json(&serde_json::json!({ + "title": title, + "body": body_md, + })) + .send() + .await + .and_then(|res| res.error_for_status())? + .bytes() + .await? + .as_ref(), + ) + .wrap_internal_err("email body is not valid UTF-8") + .map(ToOwned::to_owned) + }) + .await +} + fn date_human_readable(date: chrono::DateTime) -> String { date.format("%B %d, %Y").to_string() } diff --git a/apps/labrinth/src/routes/internal/external_notifications.rs b/apps/labrinth/src/routes/internal/external_notifications.rs index 0efa6276..a8e54c64 100644 --- a/apps/labrinth/src/routes/internal/external_notifications.rs +++ b/apps/labrinth/src/routes/internal/external_notifications.rs @@ -1,10 +1,15 @@ +use crate::auth::get_user_from_headers; use crate::database::models::ids::DBUserId; use crate::database::models::notification_item::NotificationBuilder; use crate::database::models::user_item::DBUser; use crate::database::redis::RedisPool; +use crate::models::users::Role; use crate::models::v3::notifications::NotificationBody; +use crate::models::v3::pats::Scopes; +use crate::queue::session::AuthQueue; use crate::routes::ApiError; use crate::util::guards::external_notification_key_guard; +use actix_web::HttpRequest; use actix_web::web; use actix_web::{HttpResponse, post}; use ariadne::ids::UserId; @@ -12,7 +17,7 @@ use serde::Deserialize; use sqlx::PgPool; pub fn config(cfg: &mut web::ServiceConfig) { - cfg.service(create); + cfg.service(create).service(send_custom_email); } #[derive(Deserialize)] @@ -50,3 +55,64 @@ pub async fn create( Ok(HttpResponse::Accepted().finish()) } + +#[derive(Deserialize)] +struct SendEmail { + pub users: Vec, + pub key: String, + pub body_md: String, + pub title: String, +} + +#[post("external_notifications/send_custom_email")] +pub async fn send_custom_email( + req: HttpRequest, + pool: web::Data, + redis: web::Data, + session_queue: web::Data, + body: web::Json, +) -> Result { + let user = get_user_from_headers( + &req, + &**pool, + &redis, + &session_queue, + Scopes::SESSION_ACCESS, + ) + .await? + .1; + + if user.role != Role::Admin { + return Err(ApiError::CustomAuthentication( + "You do not have permission to send custom emails!".to_string(), + )); + } + + let SendEmail { + users, + body_md, + title, + key, + } = body.into_inner(); + + let users = users + .into_iter() + .map(|x| DBUserId(x.0 as i64)) + .collect::>(); + + let mut txn = pool.begin().await?; + + NotificationBuilder { + body: NotificationBody::Custom { + title, + body_md, + key, + }, + } + .insert_many(users, &mut txn, &redis) + .await?; + + txn.commit().await?; + + Ok(HttpResponse::Accepted().finish()) +}