[DO NOT MERGE] Email notification system (#4338)

* Migration

* Fixup db models

* Redis

* Stuff

* Switch PKs to BIGSERIALs, insert to notifications_deliveries when inserting notifications

* Queue, templates

* Query cache

* Fixes, fixtures

* Perf, cache template data & HTML bodies

* Notification type configuration, ResetPassword notification type

* Reset password

* Query cache

* Clippy + fmt

* Traces, fix typo, fix user email in ResetPassword

* send_email

* Models, db

* Remove dead code, adjust notification settings in migration

* Clippy fmt

* Delete dead code, fixes

* Fmt

* Update apps/labrinth/src/queue/email.rs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: François-Xavier Talbot <108630700+fetchfern@users.noreply.github.com>

* Remove old fixtures

* Unify email retry delay

* Fix type

* External notifications

* Remove `notifications_types_preference_restrictions`, as user notification preferences is out of scope for this PR

* Query cache, fmt, clippy

* Fix join in get_many_user_exposed_on_site

* Remove migration comment

* Query cache

* Update html body urls

* Remove comment

* Add paymentfailed.service variable to PaymentFailed notification variant

* Fix compile error

* Fix deleting notifications

* Update apps/labrinth/src/database/models/user_item.rs

Co-authored-by: Josiah Glosson <soujournme@gmail.com>
Signed-off-by: François-Xavier Talbot <108630700+fetchfern@users.noreply.github.com>

* Update apps/labrinth/src/database/models/user_item.rs

Co-authored-by: Josiah Glosson <soujournme@gmail.com>
Signed-off-by: François-Xavier Talbot <108630700+fetchfern@users.noreply.github.com>

* Update Cargo.toml

Co-authored-by: Josiah Glosson <soujournme@gmail.com>
Signed-off-by: François-Xavier Talbot <108630700+fetchfern@users.noreply.github.com>

* Update apps/labrinth/migrations/20250902133943_notification-extension.sql

Co-authored-by: Josiah Glosson <soujournme@gmail.com>
Signed-off-by: François-Xavier Talbot <108630700+fetchfern@users.noreply.github.com>

* Address review comments

* Fix compliation

* Update apps/labrinth/src/database/models/users_notifications_preferences_item.rs

Co-authored-by: Josiah Glosson <soujournme@gmail.com>
Signed-off-by: François-Xavier Talbot <108630700+fetchfern@users.noreply.github.com>

* Use strfmt to format emails

* Configurable Reply-To

* Configurable Reply-To

* Refactor for email background task

* Send some emails inline

* Fix account creation email check

* Revert "Use strfmt to format emails"

This reverts commit e0d6614afe51fa6349918377e953ba294c34ae0b.

* Reintroduce fill_template

* Set password reset email inline

* Process more emails per index

* clippy fmt

* Query cache

---------

Signed-off-by: François-Xavier Talbot <108630700+fetchfern@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Josiah Glosson <soujournme@gmail.com>
This commit is contained in:
François-Xavier Talbot
2025-09-15 15:02:29 -04:00
committed by GitHub
parent 1491642209
commit 902d749293
51 changed files with 2958 additions and 3652 deletions

View File

@@ -10,6 +10,9 @@ pub mod image_item;
pub mod legacy_loader_fields;
pub mod loader_fields;
pub mod notification_item;
pub mod notifications_deliveries_item;
pub mod notifications_template_item;
pub mod notifications_type_item;
pub mod oauth_client_authorization_item;
pub mod oauth_client_item;
pub mod oauth_token_item;
@@ -26,6 +29,7 @@ pub mod thread_item;
pub mod user_item;
pub mod user_subscription_item;
pub mod users_compliance;
pub mod users_notifications_preferences_item;
pub mod users_redeemals;
pub mod version_item;

View File

@@ -1,6 +1,8 @@
use super::ids::*;
use crate::database::{models::DatabaseError, redis::RedisPool};
use crate::models::notifications::NotificationBody;
use crate::models::notifications::{
NotificationBody, NotificationChannel, NotificationDeliveryStatus,
};
use chrono::{DateTime, Utc};
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
@@ -55,6 +57,10 @@ impl NotificationBuilder {
.map(|_| body.clone())
.collect::<Vec<_>>();
let users_raw_ids = users.iter().map(|x| x.0).collect::<Vec<_>>();
let notification_ids =
notification_ids.iter().map(|x| x.0).collect::<Vec<_>>();
sqlx::query!(
"
INSERT INTO notifications (
@@ -62,16 +68,97 @@ impl NotificationBuilder {
)
SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::jsonb[])
",
&notification_ids
.into_iter()
.map(|x| x.0)
.collect::<Vec<_>>()[..],
&users.iter().map(|x| x.0).collect::<Vec<_>>()[..],
&notification_ids[..],
&users_raw_ids[..],
&bodies[..],
)
.execute(&mut **transaction)
.await?;
let notification_types = notification_ids
.iter()
.map(|_| self.body.notification_type().as_str())
.collect::<Vec<_>>();
let notification_channels = NotificationChannel::list()
.iter()
.map(|x| x.as_str())
.collect::<Vec<&str>>();
// Insert required rows into `notifications_deliveries` by channel
// and notification type, based on the user's preferences.
let query = sqlx::query!(
r#"
WITH
channels AS (
SELECT channel FROM UNNEST($1::varchar[]) AS t(channel)
),
delivery_candidates AS (
SELECT
ids.notification_id,
ids.user_id,
channels.channel,
nt.delivery_priority,
uprefs.enabled user_enabled,
dprefs.enabled default_enabled
FROM
UNNEST(
$2::bigint[],
$3::bigint[],
$4::varchar[]
) AS ids(notification_id, user_id, notification_type)
CROSS JOIN channels
INNER JOIN
notifications_types nt ON nt.name = ids.notification_type
LEFT JOIN users_notifications_preferences uprefs
ON uprefs.user_id = ids.user_id
AND uprefs.channel = channels.channel
AND uprefs.notification_type = ids.notification_type
LEFT JOIN users_notifications_preferences dprefs
ON dprefs.user_id IS NULL
AND dprefs.channel = channels.channel
AND dprefs.notification_type = ids.notification_type
)
INSERT INTO notifications_deliveries
(notification_id, user_id, channel, delivery_priority, status, next_attempt, attempt_count)
SELECT
dc.notification_id,
dc.user_id,
dc.channel,
dc.delivery_priority,
CASE
-- User explicitly enabled
WHEN user_enabled = TRUE THEN $5
-- Is enabled by default, no preference by user
WHEN user_enabled IS NULL AND default_enabled = TRUE THEN $5
-- User explicitly disabled (regardless of default)
WHEN user_enabled = FALSE THEN $6
-- User set no preference, default disabled
WHEN user_enabled IS NULL AND default_enabled = FALSE THEN $7
-- At this point, user set no preference and there is no
-- default set, so treat as disabled-by-default.
ELSE $7
END status,
NOW() next_attempt,
0 attempt_count
FROM
delivery_candidates dc
"#,
&notification_channels[..] as &[&str],
&notification_ids[..],
&users_raw_ids[..],
&notification_types[..] as &[&str],
NotificationDeliveryStatus::Pending.as_str(),
NotificationDeliveryStatus::SkippedPreferences.as_str(),
NotificationDeliveryStatus::SkippedDefault.as_str(),
);
query.execute(&mut **transaction).await?;
DBNotification::clear_user_notifications_cache(&users, redis).await?;
Ok(())
@@ -96,7 +183,7 @@ impl DBNotification {
exec: E,
) -> Result<Vec<DBNotification>, sqlx::Error>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
let notification_ids_parsed: Vec<i64> =
notification_ids.iter().map(|x| x.0).collect();
@@ -144,7 +231,60 @@ impl DBNotification {
.await
}
pub async fn get_many_user<'a, E>(
pub async fn get_all_user<'a, E>(
user_id: DBUserId,
exec: E,
) -> Result<Vec<DBNotification>, DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
{
let db_notifications = sqlx::query!(
"
SELECT n.id, n.user_id, n.name, n.text, n.link, n.created, n.read, n.type notification_type, n.body,
JSONB_AGG(DISTINCT jsonb_build_object('id', na.id, 'notification_id', na.notification_id, 'name', na.name, 'action_route_method', na.action_route_method, 'action_route', na.action_route)) filter (where na.id is not null) actions
FROM notifications n
LEFT OUTER JOIN notifications_actions na on n.id = na.notification_id
WHERE n.user_id = $1
GROUP BY n.id, n.user_id
",
user_id as DBUserId
)
.fetch(exec)
.map_ok(|row| {
let id = DBNotificationId(row.id);
DBNotification {
id,
user_id: DBUserId(row.user_id),
read: row.read,
created: row.created,
body: row.body.clone().and_then(|x| serde_json::from_value(x).ok()).unwrap_or_else(|| {
if let Some(name) = row.name {
NotificationBody::LegacyMarkdown {
notification_type: row.notification_type,
name,
text: row.text.unwrap_or_default(),
link: row.link.unwrap_or_default(),
actions: serde_json::from_value(
row.actions.unwrap_or_default(),
)
.ok()
.unwrap_or_default(),
}
} else {
NotificationBody::Unknown
}
}),
}
})
.try_collect::<Vec<DBNotification>>()
.await?;
Ok(db_notifications)
}
/// Returns user notifications that are configured to be exposed on the website.
pub async fn get_many_user_exposed_on_site<'a, E>(
user_id: DBUserId,
exec: E,
redis: &RedisPool,
@@ -171,8 +311,10 @@ impl DBNotification {
JSONB_AGG(DISTINCT jsonb_build_object('id', na.id, 'notification_id', na.notification_id, 'name', na.name, 'action_route_method', na.action_route_method, 'action_route', na.action_route)) filter (where na.id is not null) actions
FROM notifications n
LEFT OUTER JOIN notifications_actions na on n.id = na.notification_id
INNER JOIN notifications_types nt on nt.name = n.body ->> 'type'
WHERE n.user_id = $1
GROUP BY n.id, n.user_id;
AND nt.expose_in_site_notifications = TRUE
GROUP BY n.id, n.user_id
",
user_id as DBUserId
)
@@ -274,6 +416,16 @@ impl DBNotification {
let notification_ids_parsed: Vec<i64> =
notification_ids.iter().map(|x| x.0).collect();
sqlx::query!(
"
DELETE FROM notifications_deliveries
WHERE notification_id = ANY($1)
",
&notification_ids_parsed
)
.execute(&mut **transaction)
.await?;
sqlx::query!(
"
DELETE FROM notifications_actions

View File

@@ -0,0 +1,162 @@
use super::ids::*;
use crate::database::models::DatabaseError;
use crate::models::v3::notifications::{
NotificationChannel, NotificationDeliveryStatus,
};
use chrono::{DateTime, Utc};
pub struct DBNotificationDelivery {
pub id: i64,
pub notification_id: DBNotificationId,
pub user_id: DBUserId,
pub channel: NotificationChannel,
pub delivery_priority: i32,
pub status: NotificationDeliveryStatus,
pub next_attempt: DateTime<Utc>,
pub attempt_count: i32,
}
struct NotificationDeliveryQueryResult {
id: i64,
notification_id: i64,
user_id: i64,
channel: String,
delivery_priority: i32,
status: String,
next_attempt: DateTime<Utc>,
attempt_count: i32,
}
macro_rules! select_notification_deliveries_with_predicate {
($predicate:literal $(, $($param0:expr $(, $param:expr)* $(,)?)?)?) => {
sqlx::query_as!(
NotificationDeliveryQueryResult,
r#"
SELECT
id, notification_id, user_id, channel, delivery_priority, status, next_attempt, attempt_count
FROM notifications_deliveries
"#
+ $predicate
$($(, $param0 $(, $param)* )?)?
)
};
}
impl From<NotificationDeliveryQueryResult> for DBNotificationDelivery {
fn from(r: NotificationDeliveryQueryResult) -> Self {
DBNotificationDelivery {
id: r.id,
notification_id: DBNotificationId(r.notification_id),
user_id: DBUserId(r.user_id),
channel: NotificationChannel::from_str_or_default(&r.channel),
delivery_priority: r.delivery_priority,
status: NotificationDeliveryStatus::from_str_or_default(&r.status),
next_attempt: r.next_attempt,
attempt_count: r.attempt_count,
}
}
}
impl DBNotificationDelivery {
pub async fn get_all_user(
user_id: DBUserId,
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<DBNotificationDelivery>, DatabaseError> {
let user_id = user_id.0;
let results = select_notification_deliveries_with_predicate!(
"WHERE user_id = $1",
user_id
)
.fetch_all(exec)
.await?;
Ok(results.into_iter().map(|r| r.into()).collect())
}
/// Returns deliveries that should be processed next for a given channel using a row-level
/// `UPDATE` lock, barring the provided limit.
pub async fn lock_channel_processable(
channel: NotificationChannel,
limit: i64,
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<DBNotificationDelivery>, DatabaseError> {
// This follows the `idx_notifications_deliveries_composite_queue` index.
Ok(select_notification_deliveries_with_predicate!(
"WHERE
status = $3
AND channel = $1
AND next_attempt <= NOW()
ORDER BY
delivery_priority DESC,
next_attempt ASC
LIMIT $2
FOR UPDATE
SKIP LOCKED
",
channel.as_str(),
limit,
NotificationDeliveryStatus::Pending.as_str()
)
.fetch_all(exec)
.await?
.into_iter()
.map(Into::into)
.collect())
}
/// Inserts the row into the table and updates its ID.
pub async fn insert(
&mut self,
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<(), DatabaseError> {
let id = sqlx::query_scalar!(
"
INSERT INTO notifications_deliveries (
notification_id, user_id, channel, delivery_priority, status, next_attempt, attempt_count
)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id
",
self.notification_id.0,
self.user_id.0,
self.channel.as_str(),
self.delivery_priority,
self.status.as_str(),
self.next_attempt,
self.attempt_count,
)
.fetch_one(exec)
.await?;
self.id = id;
Ok(())
}
/// Updates semantically mutable columns of the row.
pub async fn update(
&self,
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<(), DatabaseError> {
sqlx::query!(
"
UPDATE notifications_deliveries
SET
delivery_priority = $2,
status = $3,
next_attempt = $4,
attempt_count = $5
WHERE id = $1
",
self.id,
self.delivery_priority,
self.status.as_str(),
self.next_attempt,
self.attempt_count,
)
.execute(exec)
.await?;
Ok(())
}
}

View File

@@ -0,0 +1,112 @@
use crate::database::models::DatabaseError;
use crate::database::redis::RedisPool;
use crate::models::v3::notifications::{NotificationChannel, NotificationType};
use serde::{Deserialize, Serialize};
const TEMPLATES_NAMESPACE: &str = "notifications_templates";
const TEMPLATES_HTML_DATA_NAMESPACE: &str = "notifications_templates_html_data";
const HTML_DATA_CACHE_EXPIRY: i64 = 60 * 15; // 15 minutes
#[derive(Clone, Serialize, Deserialize)]
pub struct NotificationTemplate {
pub id: i64,
pub channel: NotificationChannel,
pub notification_type: NotificationType,
pub subject_line: String,
pub body_fetch_url: String,
pub plaintext_fallback: String,
}
struct NotificationTemplateQueryResult {
id: i64,
channel: String,
notification_type: String,
subject_line: String,
body_fetch_url: String,
plaintext_fallback: String,
}
impl From<NotificationTemplateQueryResult> for NotificationTemplate {
fn from(r: NotificationTemplateQueryResult) -> Self {
NotificationTemplate {
id: r.id,
channel: NotificationChannel::from_str_or_default(&r.channel),
notification_type: NotificationType::from_str_or_default(
&r.notification_type,
),
subject_line: r.subject_line,
body_fetch_url: r.body_fetch_url,
plaintext_fallback: r.plaintext_fallback,
}
}
}
impl NotificationTemplate {
pub async fn list_channel(
channel: NotificationChannel,
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
redis: &RedisPool,
) -> Result<Vec<NotificationTemplate>, DatabaseError> {
let mut redis = redis.connect().await?;
let maybe_cached_templates = redis
.get_deserialized_from_json(TEMPLATES_NAMESPACE, channel.as_str())
.await?;
if let Some(cached) = maybe_cached_templates {
return Ok(cached);
}
let results = sqlx::query_as!(
NotificationTemplateQueryResult,
r#"
SELECT * FROM notifications_templates WHERE channel = $1
"#,
channel.as_str(),
)
.fetch_all(exec)
.await?;
let templates = results.into_iter().map(Into::into).collect();
redis
.set_serialized_to_json(
TEMPLATES_NAMESPACE,
channel.as_str(),
&templates,
None,
)
.await?;
Ok(templates)
}
pub async fn get_cached_html_data(
&self,
redis: &RedisPool,
) -> Result<Option<String>, DatabaseError> {
let mut redis = redis.connect().await?;
redis
.get_deserialized_from_json(
TEMPLATES_HTML_DATA_NAMESPACE,
&self.id.to_string(),
)
.await
}
pub async fn set_cached_html_data(
&self,
data: String,
redis: &RedisPool,
) -> Result<(), DatabaseError> {
let mut redis = redis.connect().await?;
redis
.set_serialized_to_json(
TEMPLATES_HTML_DATA_NAMESPACE,
&self.id.to_string(),
&data,
Some(HTML_DATA_CACHE_EXPIRY),
)
.await
}
}

View File

@@ -0,0 +1,72 @@
use crate::database::models::DatabaseError;
use crate::database::redis::RedisPool;
use crate::models::v3::notifications::NotificationType;
use serde::{Deserialize, Serialize};
const NOTIFICATION_TYPES_NAMESPACE: &str = "notification_types";
#[derive(Serialize, Deserialize)]
pub struct NotificationTypeItem {
pub name: NotificationType,
pub delivery_priority: i32,
pub expose_in_user_preferences: bool,
pub expose_in_site_notifications: bool,
}
struct NotificationTypeQueryResult {
name: String,
delivery_priority: i32,
expose_in_user_preferences: bool,
expose_in_site_notifications: bool,
}
impl From<NotificationTypeQueryResult> for NotificationTypeItem {
fn from(r: NotificationTypeQueryResult) -> Self {
NotificationTypeItem {
name: NotificationType::from_str_or_default(&r.name),
delivery_priority: r.delivery_priority,
expose_in_user_preferences: r.expose_in_user_preferences,
expose_in_site_notifications: r.expose_in_site_notifications,
}
}
}
impl NotificationTypeItem {
pub async fn list<'a, E>(
exec: E,
redis: &RedisPool,
) -> Result<Vec<NotificationTypeItem>, DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
let mut redis = redis.connect().await?;
let cached_types = redis
.get_deserialized_from_json(NOTIFICATION_TYPES_NAMESPACE, "all")
.await?;
if let Some(types) = cached_types {
return Ok(types);
}
let results = sqlx::query_as!(
NotificationTypeQueryResult,
"SELECT * FROM notifications_types"
)
.fetch_all(exec)
.await?;
let types = results.into_iter().map(Into::into).collect();
redis
.set_serialized_to_json(
NOTIFICATION_TYPES_NAMESPACE,
"all",
&types,
None,
)
.await?;
Ok(types)
}
}

View File

@@ -233,7 +233,7 @@ impl DBUser {
exec: E,
) -> Result<Option<DBUserId>, sqlx::Error>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
let user = sqlx::query!(
"
@@ -254,7 +254,7 @@ impl DBUser {
exec: E,
) -> Result<Vec<DBUserId>, sqlx::Error>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
let users = sqlx::query!(
"
@@ -270,13 +270,32 @@ impl DBUser {
Ok(users)
}
/// Returns `false` if any of the specified user IDs do not exist.
pub async fn exists_many<'a, E>(
user_ids: &[DBUserId],
exec: E,
) -> Result<bool, sqlx::Error>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
let ids = user_ids.iter().map(|x| x.0).collect::<Vec<_>>();
let count = sqlx::query_scalar!(
r#"SELECT COUNT(*) "count!" FROM users WHERE id = ANY($1)"#,
&ids
)
.fetch_one(exec)
.await?;
Ok(count as usize == user_ids.len())
}
pub async fn get_projects<'a, E>(
user_id: DBUserId,
exec: E,
redis: &RedisPool,
) -> Result<Vec<DBProjectId>, DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
use futures::stream::TryStreamExt;
@@ -324,7 +343,7 @@ impl DBUser {
exec: E,
) -> Result<Vec<DBOrganizationId>, sqlx::Error>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
use futures::stream::TryStreamExt;
@@ -349,7 +368,7 @@ impl DBUser {
exec: E,
) -> Result<Vec<DBCollectionId>, sqlx::Error>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
use futures::stream::TryStreamExt;
@@ -373,7 +392,7 @@ impl DBUser {
exec: E,
) -> Result<Vec<DBProjectId>, sqlx::Error>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
use futures::stream::TryStreamExt;
@@ -397,7 +416,7 @@ impl DBUser {
exec: E,
) -> Result<Vec<DBReportId>, sqlx::Error>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
use futures::stream::TryStreamExt;
@@ -421,7 +440,7 @@ impl DBUser {
exec: E,
) -> Result<Vec<String>, sqlx::Error>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
{
use futures::stream::TryStreamExt;

View File

@@ -0,0 +1,111 @@
use super::ids::*;
use crate::database::models::DatabaseError;
use crate::models::v3::notifications::{NotificationChannel, NotificationType};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
pub struct UserNotificationPreference {
pub id: i64,
pub user_id: Option<DBUserId>,
pub channel: NotificationChannel,
pub notification_type: NotificationType,
pub enabled: bool,
}
struct UserNotificationPreferenceQueryResult {
id: i64,
user_id: Option<i64>,
channel: String,
notification_type: String,
enabled: bool,
}
impl From<UserNotificationPreferenceQueryResult>
for UserNotificationPreference
{
fn from(r: UserNotificationPreferenceQueryResult) -> Self {
UserNotificationPreference {
id: r.id,
user_id: r.user_id.map(DBUserId),
channel: NotificationChannel::from_str_or_default(&r.channel),
notification_type: NotificationType::from_str_or_default(
&r.notification_type,
),
enabled: r.enabled,
}
}
}
impl UserNotificationPreference {
pub async fn get_user_or_default(
user_id: DBUserId,
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<UserNotificationPreference>, DatabaseError> {
Self::get_many_users_or_default(&[user_id], exec).await
}
pub async fn get_many_users_or_default(
user_ids: &[DBUserId],
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<UserNotificationPreference>, DatabaseError> {
let results = sqlx::query!(
r#"
SELECT
COALESCE(unp.id, dnp.id) "id!",
unp.user_id,
dnp.channel "channel!",
dnp.notification_type "notification_type!",
COALESCE(unp.enabled, dnp.enabled, false) "enabled!"
FROM users_notifications_preferences dnp
LEFT JOIN users_notifications_preferences unp
ON unp.channel = dnp.channel
AND unp.notification_type = dnp.notification_type
AND unp.user_id = ANY($1::bigint[])
"#,
&user_ids.iter().map(|x| x.0).collect::<Vec<_>>(),
)
.fetch_all(exec)
.await?;
let preferences = results
.into_iter()
.map(|r| UserNotificationPreference {
id: r.id,
user_id: r.user_id.map(DBUserId),
channel: NotificationChannel::from_str_or_default(&r.channel),
notification_type: NotificationType::from_str_or_default(
&r.notification_type,
),
enabled: r.enabled,
})
.collect();
Ok(preferences)
}
/// Inserts the row into the table and updates its ID.
pub async fn insert(
&mut self,
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<(), DatabaseError> {
let id = sqlx::query_scalar!(
"
INSERT INTO users_notifications_preferences (
user_id, channel, notification_type, enabled
)
VALUES ($1, $2, $3, $4)
RETURNING id
",
self.user_id.map(|x| x.0),
self.channel.as_str(),
self.notification_type.as_str(),
self.enabled,
)
.fetch_one(exec)
.await?;
self.id = id;
Ok(())
}
}