1
0
Files
AstralRinth/apps/labrinth/src/database/models/notifications_deliveries_item.rs
François-Xavier Talbot 902d749293 [DO NOT MERGE] Email notification system (#4338)
* Migration

* Fixup db models

* Redis

* Stuff

* Switch PKs to BIGSERIALs, insert to notifications_deliveries when inserting notifications

* Queue, templates

* Query cache

* Fixes, fixtures

* Perf, cache template data & HTML bodies

* Notification type configuration, ResetPassword notification type

* Reset password

* Query cache

* Clippy + fmt

* Traces, fix typo, fix user email in ResetPassword

* send_email

* Models, db

* Remove dead code, adjust notification settings in migration

* Clippy fmt

* Delete dead code, fixes

* Fmt

* Update apps/labrinth/src/queue/email.rs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: François-Xavier Talbot <108630700+fetchfern@users.noreply.github.com>

* Remove old fixtures

* Unify email retry delay

* Fix type

* External notifications

* Remove `notifications_types_preference_restrictions`, as user notification preferences is out of scope for this PR

* Query cache, fmt, clippy

* Fix join in get_many_user_exposed_on_site

* Remove migration comment

* Query cache

* Update html body urls

* Remove comment

* Add paymentfailed.service variable to PaymentFailed notification variant

* Fix compile error

* Fix deleting notifications

* Update apps/labrinth/src/database/models/user_item.rs

Co-authored-by: Josiah Glosson <soujournme@gmail.com>
Signed-off-by: François-Xavier Talbot <108630700+fetchfern@users.noreply.github.com>

* Update apps/labrinth/src/database/models/user_item.rs

Co-authored-by: Josiah Glosson <soujournme@gmail.com>
Signed-off-by: François-Xavier Talbot <108630700+fetchfern@users.noreply.github.com>

* Update Cargo.toml

Co-authored-by: Josiah Glosson <soujournme@gmail.com>
Signed-off-by: François-Xavier Talbot <108630700+fetchfern@users.noreply.github.com>

* Update apps/labrinth/migrations/20250902133943_notification-extension.sql

Co-authored-by: Josiah Glosson <soujournme@gmail.com>
Signed-off-by: François-Xavier Talbot <108630700+fetchfern@users.noreply.github.com>

* Address review comments

* Fix compliation

* Update apps/labrinth/src/database/models/users_notifications_preferences_item.rs

Co-authored-by: Josiah Glosson <soujournme@gmail.com>
Signed-off-by: François-Xavier Talbot <108630700+fetchfern@users.noreply.github.com>

* Use strfmt to format emails

* Configurable Reply-To

* Configurable Reply-To

* Refactor for email background task

* Send some emails inline

* Fix account creation email check

* Revert "Use strfmt to format emails"

This reverts commit e0d6614afe51fa6349918377e953ba294c34ae0b.

* Reintroduce fill_template

* Set password reset email inline

* Process more emails per index

* clippy fmt

* Query cache

---------

Signed-off-by: François-Xavier Talbot <108630700+fetchfern@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Josiah Glosson <soujournme@gmail.com>
2025-09-15 19:02:29 +00:00

163 lines
4.8 KiB
Rust

use super::ids::*;
use crate::database::models::DatabaseError;
use crate::models::v3::notifications::{
NotificationChannel, NotificationDeliveryStatus,
};
use chrono::{DateTime, Utc};
pub struct DBNotificationDelivery {
pub id: i64,
pub notification_id: DBNotificationId,
pub user_id: DBUserId,
pub channel: NotificationChannel,
pub delivery_priority: i32,
pub status: NotificationDeliveryStatus,
pub next_attempt: DateTime<Utc>,
pub attempt_count: i32,
}
struct NotificationDeliveryQueryResult {
id: i64,
notification_id: i64,
user_id: i64,
channel: String,
delivery_priority: i32,
status: String,
next_attempt: DateTime<Utc>,
attempt_count: i32,
}
macro_rules! select_notification_deliveries_with_predicate {
($predicate:literal $(, $($param0:expr $(, $param:expr)* $(,)?)?)?) => {
sqlx::query_as!(
NotificationDeliveryQueryResult,
r#"
SELECT
id, notification_id, user_id, channel, delivery_priority, status, next_attempt, attempt_count
FROM notifications_deliveries
"#
+ $predicate
$($(, $param0 $(, $param)* )?)?
)
};
}
impl From<NotificationDeliveryQueryResult> for DBNotificationDelivery {
fn from(r: NotificationDeliveryQueryResult) -> Self {
DBNotificationDelivery {
id: r.id,
notification_id: DBNotificationId(r.notification_id),
user_id: DBUserId(r.user_id),
channel: NotificationChannel::from_str_or_default(&r.channel),
delivery_priority: r.delivery_priority,
status: NotificationDeliveryStatus::from_str_or_default(&r.status),
next_attempt: r.next_attempt,
attempt_count: r.attempt_count,
}
}
}
impl DBNotificationDelivery {
pub async fn get_all_user(
user_id: DBUserId,
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<DBNotificationDelivery>, DatabaseError> {
let user_id = user_id.0;
let results = select_notification_deliveries_with_predicate!(
"WHERE user_id = $1",
user_id
)
.fetch_all(exec)
.await?;
Ok(results.into_iter().map(|r| r.into()).collect())
}
/// Returns deliveries that should be processed next for a given channel using a row-level
/// `UPDATE` lock, barring the provided limit.
pub async fn lock_channel_processable(
channel: NotificationChannel,
limit: i64,
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<DBNotificationDelivery>, DatabaseError> {
// This follows the `idx_notifications_deliveries_composite_queue` index.
Ok(select_notification_deliveries_with_predicate!(
"WHERE
status = $3
AND channel = $1
AND next_attempt <= NOW()
ORDER BY
delivery_priority DESC,
next_attempt ASC
LIMIT $2
FOR UPDATE
SKIP LOCKED
",
channel.as_str(),
limit,
NotificationDeliveryStatus::Pending.as_str()
)
.fetch_all(exec)
.await?
.into_iter()
.map(Into::into)
.collect())
}
/// Inserts the row into the table and updates its ID.
pub async fn insert(
&mut self,
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<(), DatabaseError> {
let id = sqlx::query_scalar!(
"
INSERT INTO notifications_deliveries (
notification_id, user_id, channel, delivery_priority, status, next_attempt, attempt_count
)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id
",
self.notification_id.0,
self.user_id.0,
self.channel.as_str(),
self.delivery_priority,
self.status.as_str(),
self.next_attempt,
self.attempt_count,
)
.fetch_one(exec)
.await?;
self.id = id;
Ok(())
}
/// Updates semantically mutable columns of the row.
pub async fn update(
&self,
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<(), DatabaseError> {
sqlx::query!(
"
UPDATE notifications_deliveries
SET
delivery_priority = $2,
status = $3,
next_attempt = $4,
attempt_count = $5
WHERE id = $1
",
self.id,
self.delivery_priority,
self.status.as_str(),
self.next_attempt,
self.attempt_count,
)
.execute(exec)
.await?;
Ok(())
}
}