Custom Emails (#4526)

* Dynamic email template

* Set lower cache expiry for templates

* Custom email route

* Fix subject line on custom emails

* chore: query cache, clippy, fmt

* Bugfixes

* Key-based caching on custom emails

* Sequentially process emails prone to causing cache stampede

* Fill variables in dynamic body + subject line

* Update apps/labrinth/src/queue/email/templates.rs

Co-authored-by: aecsocket <aecsocket@tutanota.com>
Signed-off-by: François-Xavier Talbot <108630700+fetchfern@users.noreply.github.com>

* Update apps/labrinth/src/queue/email/templates.rs

Co-authored-by: aecsocket <aecsocket@tutanota.com>
Signed-off-by: François-Xavier Talbot <108630700+fetchfern@users.noreply.github.com>

---------

Signed-off-by: François-Xavier Talbot <108630700+fetchfern@users.noreply.github.com>
Co-authored-by: aecsocket <aecsocket@tutanota.com>
This commit is contained in:
François-Xavier Talbot
2025-10-10 17:30:38 +01:00
committed by GitHub
parent aec49cff7c
commit 0c66fa3f12
6 changed files with 342 additions and 65 deletions

View File

@@ -4,7 +4,7 @@ use crate::database::models::notifications_deliveries_item::DBNotificationDelive
use crate::database::models::notifications_template_item::NotificationTemplate;
use crate::database::models::user_item::DBUser;
use crate::database::redis::RedisPool;
use crate::models::notifications::NotificationBody;
use crate::models::notifications::{NotificationBody, NotificationType};
use crate::models::v3::notifications::{
NotificationChannel, NotificationDeliveryStatus,
};
@@ -20,6 +20,7 @@ use sqlx::PgPool;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::Mutex as TokioMutex;
use tokio::sync::Semaphore;
use tracing::{error, info, instrument, warn};
const EMAIL_RETRY_DELAY_SECONDS: i64 = 10;
@@ -187,13 +188,19 @@ impl EmailQueue {
// For all notifications we collected, fill out the template
// and send it via SMTP in parallel.
let mut futures = FuturesUnordered::new();
// Some email notifications should still be processed sequentially. This is to avoid cache stampede in the
// case that processing the email can be heavy. For example, custom emails always make a POST request to modrinth.com,
// which, while not necessarily slow, is subject to heavy rate limiting.
let sequential_processing = Arc::new(Semaphore::new(1));
for notification in notifications {
let this = self.clone();
let transport = Arc::clone(&transport);
let seq = Arc::clone(&sequential_processing);
futures.push(async move {
let mut txn = this.pg.begin().await?;
@@ -214,15 +221,35 @@ impl EmailQueue {
));
};
this.send_one_with_transport(
&mut txn,
transport,
notification.body,
notification.user_id,
mailbox,
)
.await
.map(|status| (notification.id, status))
// For the cache stampede reasons mentioned above, we process custom emails exclusively sequentially.
// This could cause unnecessary slowness if we're sending a lot of custom emails with the same key in one go,
// and the cache is already populated (thus the sequential processing would not be needed).
let maybe_permit = if notification.body.notification_type()
== NotificationType::Custom
{
Some(
seq.acquire()
.await
.expect("Semaphore should never be closed"),
)
} else {
None
};
let result = this
.send_one_with_transport(
&mut txn,
transport,
notification.body,
notification.user_id,
mailbox,
)
.await
.map(|status| (notification.id, status));
drop(maybe_permit);
result
});
}

View File

@@ -1,12 +1,15 @@
use super::MailError;
use crate::database::models::ids::*;
use crate::database::models::notifications_template_item::NotificationTemplate;
use crate::database::models::notifications_template_item::{
NotificationTemplate, get_or_set_cached_dynamic_html,
};
use crate::database::models::{
DBOrganization, DBProject, DBUser, DatabaseError,
};
use crate::database::redis::RedisPool;
use crate::models::v3::notifications::NotificationBody;
use crate::routes::ApiError;
use crate::util::error::Context;
use ariadne::ids::base62_impl::to_base62;
use futures::TryFutureExt;
use lettre::Message;
@@ -138,12 +141,21 @@ pub async fn build_email(
reply_address,
} = from;
let (html_body_result, mut variables) = futures::try_join!(
get_html_body,
collect_template_variables(exec, redis, user_id, body)
)?;
let db_user = DBUser::get_id(user_id, &mut **exec, redis)
.await?
.ok_or(DatabaseError::Database(sqlx::Error::RowNotFound))?;
variables.insert(USER_EMAIL, to.email.to_string());
let map = [
(USER_NAME, db_user.username),
(USER_EMAIL, to.email.to_string()),
]
.into_iter()
.collect();
let (html_body_result, either) = futures::try_join!(
get_html_body,
collect_template_variables(exec, redis, user_id, body, map)
)?;
let mut message_builder = Message::builder().from(Mailbox::new(
Some(from_name),
@@ -157,28 +169,79 @@ pub async fn build_email(
));
}
let subject = fill_template(&template.subject_line, &variables);
message_builder = message_builder.to(to).subject(subject);
struct Body {
plaintext: Option<String>,
html: Option<String>,
}
let plaintext_filled_body =
fill_template(&template.plaintext_fallback, &variables);
let (body, subject) = match either {
EmailTemplate::Static(variables) => {
let plaintext_filled_body =
fill_template(&template.plaintext_fallback, &variables);
let email_message = match html_body_result {
Ok(html_body) => {
let html_filled_body = fill_template(&html_body, &variables);
message_builder
.multipart(MultiPart::alternative_plain_html(
plaintext_filled_body,
html_filled_body,
))
.map_err(MailError::from)?
let email_body = Body {
plaintext: Some(plaintext_filled_body),
html: match html_body_result {
Ok(html_body) => {
Some(fill_template(&html_body, &variables))
}
Err(error) => {
error!(%error, "Failed to fetch template body");
None
}
},
};
let subject = fill_template(&template.subject_line, &variables);
(email_body, subject)
}
Err(error) => {
error!(%error, "Failed to fetch template body");
message_builder
.singlepart(SinglePart::plain(plaintext_filled_body))
.map_err(MailError::from)?
EmailTemplate::Dynamic {
variables,
body,
title,
} => {
let body = Body {
plaintext: None,
html: Some(fill_template(&body, &variables)),
};
(body, fill_template(&title, &variables))
}
};
message_builder = message_builder.to(to).subject(subject);
let email_message = match body {
Body {
plaintext: Some(plaintext),
html: Some(html),
} => message_builder
.multipart(MultiPart::alternative_plain_html(plaintext, html))
.map_err(MailError::from)?,
Body {
plaintext: Some(plaintext),
html: None,
} => message_builder
.singlepart(SinglePart::plain(plaintext))
.map_err(MailError::from)?,
Body {
plaintext: None,
html: Some(html),
} => message_builder
.singlepart(SinglePart::html(html))
.map_err(MailError::from)?,
Body {
plaintext: None,
html: None,
} => {
return Err(ApiError::Internal(eyre::eyre!(
"Neither HTML or plaintext could be generated"
)));
}
};
@@ -218,23 +281,26 @@ fn fill_template(
buffer
}
enum EmailTemplate {
Static(HashMap<&'static str, String>),
Dynamic {
variables: HashMap<&'static str, String>,
body: String,
title: String,
},
}
async fn collect_template_variables(
exec: &mut sqlx::PgTransaction<'_>,
redis: &RedisPool,
user_id: DBUserId,
n: &NotificationBody,
) -> Result<HashMap<&'static str, String>, ApiError> {
let db_user = DBUser::get_id(user_id, &mut **exec, redis)
.await?
.ok_or_else(|| DatabaseError::Database(sqlx::Error::RowNotFound))?;
let mut map = HashMap::new();
map.insert(USER_NAME, db_user.username);
mut map: HashMap<&'static str, String>,
) -> Result<EmailTemplate, ApiError> {
match &n {
NotificationBody::PatCreated { token_name } => {
map.insert(NEWPAT_TOKEN_NAME, token_name.clone());
Ok(map)
Ok(EmailTemplate::Static(map))
}
NotificationBody::ModerationMessageReceived { project_id, .. } => {
@@ -250,7 +316,7 @@ async fn collect_template_variables(
map.insert(PROJECT_ID, to_base62(project_id.0));
map.insert(PROJECT_NAME, result.name);
map.insert(PROJECT_ICON_URL, result.icon_url.unwrap_or_default());
Ok(map)
Ok(EmailTemplate::Static(map))
}
NotificationBody::ReportStatusUpdated { report_id } => {
@@ -273,7 +339,7 @@ async fn collect_template_variables(
map.insert(REPORT_ID, to_base62(report_id.0));
map.insert(REPORT_TITLE, result.title);
map.insert(REPORT_DATE, date_human_readable(result.created));
Ok(map)
Ok(EmailTemplate::Static(map))
}
NotificationBody::ReportSubmitted { report_id } => {
@@ -294,7 +360,7 @@ async fn collect_template_variables(
map.insert(REPORT_TITLE, result.title);
map.insert(NEWREPORT_ID, to_base62(report_id.0));
Ok(map)
Ok(EmailTemplate::Static(map))
}
NotificationBody::ProjectStatusApproved { project_id } => {
@@ -310,7 +376,7 @@ async fn collect_template_variables(
map.insert(PROJECT_ID, to_base62(project_id.0));
map.insert(PROJECT_NAME, result.name);
map.insert(PROJECT_ICON_URL, result.icon_url.unwrap_or_default());
Ok(map)
Ok(EmailTemplate::Static(map))
}
NotificationBody::ProjectStatusNeutral {
@@ -332,7 +398,7 @@ async fn collect_template_variables(
map.insert(PROJECT_ICON_URL, result.icon_url.unwrap_or_default());
map.insert(PROJECT_OLD_STATUS, old_status.as_str().to_string());
map.insert(PROJECT_NEW_STATUS, new_status.as_str().to_string());
Ok(map)
Ok(EmailTemplate::Static(map))
}
NotificationBody::ProjectTransferred {
@@ -388,7 +454,7 @@ async fn collect_template_variables(
map.insert(NEWOWNER_NAME, org.name);
}
Ok(map)
Ok(EmailTemplate::Static(map))
}
NotificationBody::TeamInvite {
team_id: _,
@@ -419,7 +485,7 @@ async fn collect_template_variables(
map.insert(TEAMINVITE_PROJECT_NAME, result.project_name);
map.insert(TEAMINVITE_ROLE_NAME, role.clone());
Ok(map)
Ok(EmailTemplate::Static(map))
}
NotificationBody::OrganizationInvite {
@@ -451,7 +517,7 @@ async fn collect_template_variables(
map.insert(ORGINVITE_ORG_NAME, result.organization_name);
map.insert(ORGINVITE_ROLE_NAME, role.clone());
Ok(map)
Ok(EmailTemplate::Static(map))
}
NotificationBody::StatusChange {
@@ -479,7 +545,7 @@ async fn collect_template_variables(
map.insert(STATUSCHANGE_OLD_STATUS, old_status.as_str().to_owned());
map.insert(STATUSCHANGE_NEW_STATUS, new_status.as_str().to_owned());
Ok(map)
Ok(EmailTemplate::Static(map))
}
NotificationBody::ResetPassword { flow } => {
@@ -492,7 +558,7 @@ async fn collect_template_variables(
map.insert(RESETPASSWORD_URL, url);
Ok(map)
Ok(EmailTemplate::Static(map))
}
NotificationBody::VerifyEmail { flow } => {
@@ -505,20 +571,20 @@ async fn collect_template_variables(
map.insert(VERIFYEMAIL_URL, url);
Ok(map)
Ok(EmailTemplate::Static(map))
}
NotificationBody::AuthProviderAdded { provider }
| NotificationBody::AuthProviderRemoved { provider } => {
map.insert(AUTHPROVIDER_NAME, provider.clone());
Ok(map)
Ok(EmailTemplate::Static(map))
}
NotificationBody::TwoFactorEnabled
| NotificationBody::TwoFactorRemoved
| NotificationBody::PasswordChanged
| NotificationBody::PasswordRemoved => Ok(map),
| NotificationBody::PasswordRemoved => Ok(EmailTemplate::Static(map)),
NotificationBody::EmailChanged {
new_email,
@@ -526,7 +592,7 @@ async fn collect_template_variables(
} => {
map.insert(EMAILCHANGED_NEW_EMAIL, new_email.clone());
Ok(map)
Ok(EmailTemplate::Static(map))
}
NotificationBody::PaymentFailed { amount, service } => {
@@ -541,7 +607,7 @@ async fn collect_template_variables(
map.insert(PAYMENTFAILED_SERVICE, service.clone());
map.insert(BILLING_URL, url);
Ok(map)
Ok(EmailTemplate::Static(map))
}
NotificationBody::PayoutAvailable {
@@ -562,7 +628,7 @@ async fn collect_template_variables(
format!("USD${:.2}", *amount as f64 / 100.0),
);
Ok(map)
Ok(EmailTemplate::Static(map))
}
NotificationBody::TaxNotification {
@@ -607,16 +673,59 @@ async fn collect_template_variables(
map.insert(TAXNOTIFICATION_DUE, date_human_readable(*due));
map.insert(TAXNOTIFICATION_SERVICE, service.clone());
map.insert(SUBSCRIPTION_ID, to_base62(subscription_id.0));
Ok(map)
Ok(EmailTemplate::Static(map))
}
NotificationBody::Custom {
title,
body_md,
key,
} => Ok(EmailTemplate::Dynamic {
variables: map,
body: dynamic_email_body(redis, title, body_md, key).await?,
title: title.to_string(),
}),
NotificationBody::ProjectUpdate { .. }
| NotificationBody::ModeratorMessage { .. }
| NotificationBody::LegacyMarkdown { .. }
| NotificationBody::Unknown => Ok(map),
| NotificationBody::Unknown => Ok(EmailTemplate::Static(map)),
}
}
async fn dynamic_email_body(
redis: &RedisPool,
title: &str,
body_md: &str,
key: &str,
) -> Result<String, ApiError> {
get_or_set_cached_dynamic_html(redis, key, || async {
let site_url = dotenvy::var("SITE_URL")
.wrap_internal_err("SITE_URL is not set")?;
let site_url = site_url.trim_end_matches('/');
let url = format!("{}/_internal/templates/email/dynamic", site_url);
std::str::from_utf8(
reqwest::Client::new()
.post(url)
.json(&serde_json::json!({
"title": title,
"body": body_md,
}))
.send()
.await
.and_then(|res| res.error_for_status())?
.bytes()
.await?
.as_ref(),
)
.wrap_internal_err("email body is not valid UTF-8")
.map(ToOwned::to_owned)
})
.await
}
fn date_human_readable(date: chrono::DateTime<chrono::Utc>) -> String {
date.format("%B %d, %Y").to_string()
}