diff --git a/apps/labrinth/.sqlx/query-1dacc8ebab576d595a1a5b6a44f1c00ed9709a76cc3a65f48e710d4e73129114.json b/apps/labrinth/.sqlx/query-1dacc8ebab576d595a1a5b6a44f1c00ed9709a76cc3a65f48e710d4e73129114.json new file mode 100644 index 00000000..75aa8478 --- /dev/null +++ b/apps/labrinth/.sqlx/query-1dacc8ebab576d595a1a5b6a44f1c00ed9709a76cc3a65f48e710d4e73129114.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH\n period_payouts AS (\n SELECT\n ids.notification_id,\n ids.user_id,\n ids.date_available,\n COALESCE(SUM(pv.amount), 0.0) sum\n FROM UNNEST($1::bigint[], $2::bigint[], $3::timestamptz[]) AS ids(notification_id, user_id, date_available)\n LEFT JOIN payouts_values pv ON pv.user_id = ids.user_id AND pv.date_available = ids.date_available\n GROUP BY ids.user_id, ids.notification_id, ids.date_available\n )\n INSERT INTO notifications (\n id, user_id, body\n )\n SELECT\n notification_id id,\n user_id,\n JSONB_BUILD_OBJECT(\n 'type', 'payout_available',\n 'date_available', to_jsonb(date_available),\n 'amount', to_jsonb(sum)\n ) body\n FROM period_payouts\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8Array", + "Int8Array", + "TimestamptzArray" + ] + }, + "nullable": [] + }, + "hash": "1dacc8ebab576d595a1a5b6a44f1c00ed9709a76cc3a65f48e710d4e73129114" +} diff --git a/apps/labrinth/.sqlx/query-501a52c136866b024ab42317ec156a7021063a15d7a4eabbfb2c646a7b3becd0.json b/apps/labrinth/.sqlx/query-501a52c136866b024ab42317ec156a7021063a15d7a4eabbfb2c646a7b3becd0.json new file mode 100644 index 00000000..d41182d5 --- /dev/null +++ b/apps/labrinth/.sqlx/query-501a52c136866b024ab42317ec156a7021063a15d7a4eabbfb2c646a7b3becd0.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n\t\t\tSELECT\n\t\t\t id,\n\t\t\t user_id,\n\t\t\t date_available\n\t\t\tFROM payouts_values_notifications\n\t\t\tWHERE\n\t\t\t notified = FALSE\n\t\t\t AND date_available <= NOW()\n\t\t\tFOR UPDATE SKIP LOCKED\n\t\t\tLIMIT $1\n\t\t\t", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "user_id", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "date_available", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "501a52c136866b024ab42317ec156a7021063a15d7a4eabbfb2c646a7b3becd0" +} diff --git a/apps/labrinth/.sqlx/query-603c1109f8c5a9a5b45b3f531fcb6b597ac01c831fbbcb296fc5ba08cc622482.json b/apps/labrinth/.sqlx/query-603c1109f8c5a9a5b45b3f531fcb6b597ac01c831fbbcb296fc5ba08cc622482.json new file mode 100644 index 00000000..3d0c4126 --- /dev/null +++ b/apps/labrinth/.sqlx/query-603c1109f8c5a9a5b45b3f531fcb6b597ac01c831fbbcb296fc5ba08cc622482.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "\n\t\tINSERT INTO payouts_values_notifications (date_available, user_id, notified)\n\t\tSELECT DISTINCT date_available, user_id, false notified\n\t\tFROM payouts_values\n\t\tWHERE date_available > NOW()\n\t\tON CONFLICT (date_available, user_id) DO NOTHING\n\t\t", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + }, + "hash": "603c1109f8c5a9a5b45b3f531fcb6b597ac01c831fbbcb296fc5ba08cc622482" +} diff --git a/apps/labrinth/.sqlx/query-80d8a5b72bf5381cb228a67d2228c014bd29995af8409ea66155820617f7172a.json b/apps/labrinth/.sqlx/query-80d8a5b72bf5381cb228a67d2228c014bd29995af8409ea66155820617f7172a.json new file mode 100644 index 00000000..26001738 --- /dev/null +++ b/apps/labrinth/.sqlx/query-80d8a5b72bf5381cb228a67d2228c014bd29995af8409ea66155820617f7172a.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT name, icon_url FROM mods WHERE id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "name", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "icon_url", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false, + true + ] + }, + "hash": "80d8a5b72bf5381cb228a67d2228c014bd29995af8409ea66155820617f7172a" +} diff --git a/apps/labrinth/.sqlx/query-a19b8af8d58d30f731951dcec8081b2ca8a969369674ea7c3c72bd78c67d06b9.json b/apps/labrinth/.sqlx/query-a19b8af8d58d30f731951dcec8081b2ca8a969369674ea7c3c72bd78c67d06b9.json new file mode 100644 index 00000000..2723f9cf --- /dev/null +++ b/apps/labrinth/.sqlx/query-a19b8af8d58d30f731951dcec8081b2ca8a969369674ea7c3c72bd78c67d06b9.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n\t\t\tUPDATE payouts_values_notifications\n\t\t\tSET notified = TRUE\n\t\t\tWHERE id = ANY($1)\n\t\t\t", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int4Array" + ] + }, + "nullable": [] + }, + "hash": "a19b8af8d58d30f731951dcec8081b2ca8a969369674ea7c3c72bd78c67d06b9" +} diff --git a/apps/labrinth/.sqlx/query-aa60ab1baa25beefdf6dff23ab6f4a08619011ced36c34d7c251af7fe7b9ccc5.json b/apps/labrinth/.sqlx/query-aa60ab1baa25beefdf6dff23ab6f4a08619011ced36c34d7c251af7fe7b9ccc5.json new file mode 100644 index 00000000..fc4eba74 --- /dev/null +++ b/apps/labrinth/.sqlx/query-aa60ab1baa25beefdf6dff23ab6f4a08619011ced36c34d7c251af7fe7b9ccc5.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n r.created,\n COALESCE(m.name, v.version_number, u.username, 'unknown') \"title!\"\n FROM reports r\n LEFT JOIN mods m ON r.mod_id = m.id\n LEFT JOIN versions v ON r.version_id = v.id\n LEFT JOIN users u ON r.user_id = u.id\n WHERE r.id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "created", + "type_info": "Timestamptz" + }, + { + "ordinal": 1, + "name": "title!", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false, + null + ] + }, + "hash": "aa60ab1baa25beefdf6dff23ab6f4a08619011ced36c34d7c251af7fe7b9ccc5" +} diff --git a/apps/labrinth/.sqlx/query-da218303ffcd4c45dbad7e90fd649e2eae4dd8d0fba64bc28b4af4f22eef97e8.json b/apps/labrinth/.sqlx/query-da218303ffcd4c45dbad7e90fd649e2eae4dd8d0fba64bc28b4af4f22eef97e8.json new file mode 100644 index 00000000..18499e6a --- /dev/null +++ b/apps/labrinth/.sqlx/query-da218303ffcd4c45dbad7e90fd649e2eae4dd8d0fba64bc28b4af4f22eef97e8.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n COALESCE(m.name, v.version_number, u.username, 'unknown') \"title!\"\n FROM reports r\n LEFT JOIN mods m ON r.mod_id = m.id\n LEFT JOIN versions v ON r.version_id = v.id\n LEFT JOIN users u ON r.user_id = u.id\n WHERE r.id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "title!", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + null + ] + }, + "hash": "da218303ffcd4c45dbad7e90fd649e2eae4dd8d0fba64bc28b4af4f22eef97e8" +} diff --git a/apps/labrinth/migrations/20250916171700_payout-values-available-notification.sql b/apps/labrinth/migrations/20250916171700_payout-values-available-notification.sql new file mode 100644 index 00000000..fabc52fb --- /dev/null +++ b/apps/labrinth/migrations/20250916171700_payout-values-available-notification.sql @@ -0,0 +1,282 @@ +CREATE TABLE payouts_values_notifications ( + id SERIAL PRIMARY KEY, + date_available TIMESTAMPTZ NOT NULL, + user_id BIGINT NOT NULL REFERENCES users (id), + notified BOOLEAN NOT NULL +); + +CREATE UNIQUE INDEX payouts_values_notifications_date_available_user_id_idx ON payouts_values_notifications ( + date_available, + user_id +); + +INSERT INTO notifications_types + (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) +VALUES ('payout_available', 1, TRUE, FALSE); +INSERT INTO notifications_types + (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) +VALUES ('pat_created', 3, FALSE, FALSE); +INSERT INTO notifications_types + (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) +VALUES ('moderation_message_received', 1, TRUE, FALSE); +INSERT INTO notifications_types + (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) +VALUES ('report_status_updated', 1, TRUE, FALSE); +INSERT INTO notifications_types + (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) +VALUES ('report_submitted', 1, TRUE, FALSE); +INSERT INTO notifications_types + (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) +VALUES ('project_status_approved', 1, TRUE, FALSE); +INSERT INTO notifications_types + (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) +VALUES ('project_status_neutral', 1, TRUE, FALSE); +INSERT INTO notifications_types + (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) +VALUES ('project_transferred', 2, FALSE, FALSE); + +INSERT INTO notifications_templates + (channel, notification_type, subject_line, body_fetch_url, plaintext_fallback) +VALUES + ( + 'email', + 'payout_available', + 'Revenue available to withdraw!', + 'https://modrinth.com/email/payout-available', + CONCAT( + 'Hi {user.name},', + CHR(10), + CHR(10), + 'The ${payout.amount} earned during {payout.period} has been processed and is now available to withdraw from your account.', + CHR(10), + CHR(10), + 'View your revenue dashboard: https://modrinth.com/dashboard/revenue', + CHR(10), + CHR(10), + 'If you have any questions about the creator rewards program, please contact support through the Support Portal at https://support.modrinth.com/ or by replying to this email.', + CHR(10), + CHR(10), + 'Thank you for being a creator on Modrinth!' + ) + ); + +INSERT INTO notifications_templates + (channel, notification_type, subject_line, body_fetch_url, plaintext_fallback) +VALUES + ( + 'email', + 'pat_created', + 'A new personal access token has been created', + 'https://modrinth.com/email/personal-access-token-created', + CONCAT( + 'Hi {user.name},', + CHR(10), + CHR(10), + 'A new personal access token, {newpat.token_name}, has been added to your account.', + CHR(10), + CHR(10), + 'If you did not create this token, please contact us immediately by replying to this email or through our Support Portal.', + CHR(10), + CHR(10), + 'Support Portal: https://support.modrinth.com/' + ) + ); + +INSERT INTO notifications_templates + (channel, notification_type, subject_line, body_fetch_url, plaintext_fallback) +VALUES + ( + 'email', + 'moderation_message_received', + 'New message from moderators on your project', + 'https://modrinth.com/email/moderation-thread-message-received', + CONCAT( + 'Hi {user.name},', + CHR(10), + CHR(10), + 'Modrinth''s moderation team has left a message on your project, {project.name}.', + CHR(10), + CHR(10), + 'Please sign in to view the message and reply if requested. It''s important to address feedback from the moderation team promptly.', + CHR(10), + 'Your project''s moderation thread: https://modrinth.com/project/{project.id}/moderation', + CHR(10), + CHR(10), + 'Thank you for publishing on Modrinth!' + ) + ); + +INSERT INTO notifications_templates + (channel, notification_type, subject_line, body_fetch_url, plaintext_fallback) +VALUES + ( + 'email', + 'moderation_message_received', + 'New message from moderators on your project', + 'https://modrinth.com/email/moderation-thread-message-received', + CONCAT( + 'Hi {user.name},', + CHR(10), + CHR(10), + 'Modrinth''s moderation team has left a message on your project, {project.name}.', + CHR(10), + CHR(10), + 'Please sign in to view the message and reply if requested. It''s important to address feedback from the moderation team promptly.', + CHR(10), + 'Your project''s moderation thread: https://modrinth.com/project/{project.id}/moderation', + CHR(10), + CHR(10), + 'Thank you for publishing on Modrinth!' + ) + ); + +INSERT INTO notifications_templates + (channel, notification_type, subject_line, body_fetch_url, plaintext_fallback) +VALUES + ( + 'email', + 'report_status_updated', + 'Your report has been updated', + 'https://modrinth.com/email/report-updated', + CONCAT( + 'Hi {user.name},', + CHR(10), + CHR(10), + 'Your report of {report.title} from {report.date} has been updated by our moderation team.', + CHR(10), + CHR(10), + 'You can view the full report thread to see the update. If you have more information to add, please reply in the report thread for our moderators to review.', + CHR(10), + CHR(10), + 'Thank you for helping keep Modrinth safe and welcoming for everyone.' + ) + ); + +INSERT INTO notifications_templates + (channel, notification_type, subject_line, body_fetch_url, plaintext_fallback) +VALUES + ( + 'email', + 'report_submitted', + 'Your report has been submitted', + 'https://modrinth.com/email/report-submitted', + CONCAT( + 'Hi {user.name},', + CHR(10), + CHR(10), + 'We''ve received your report of {report.title} and our moderation team will review it shortly.', + CHR(10), + CHR(10), + 'Our team takes all reports seriously and will investigate according to our Content Rules, Terms of Service and Copyright Policy. You''ll receive an email update once we''ve completed our review.', + CHR(10), + CHR(10), + 'If you have any additional information about this report, you can view it here: https://modrinth.com/dashboard/report/{newreport.id}', + CHR(10), + CHR(10), + 'Thank you for helping keep Modrinth safe and welcoming for everyone.' + ) + ); + +INSERT INTO notifications_templates + (channel, notification_type, subject_line, body_fetch_url, plaintext_fallback) +VALUES + ( + 'email', + 'project_status_approved', + 'Your project, {project.name}, has been approved 🎉', + 'https://modrinth.com/email/project-approved', + CONCAT( + 'Congratulations {user.name},', + CHR(10), + CHR(10), + 'Your project {project.name} has been approved by the moderation team!', + CHR(10), + CHR(10), + 'View your project here: https://modrinth.com/project/{project.id}', + CHR(10), + CHR(10), + 'If you have questions or believe something isn''t correct, you can reply to this email or reach out via the Support Portal.', + CHR(10), + CHR(10), + 'Thank you for sharing your work with the Modrinth community!' + ) + ); + +INSERT INTO notifications_templates + (channel, notification_type, subject_line, body_fetch_url, plaintext_fallback) +VALUES + ( + 'email', + 'project_status_neutral', + 'Your project''s status has been updated', + 'https://modrinth.com/email/project-status-updated-neutral', + CONCAT( + 'Hi {user.name},', + CHR(10), + CHR(10), + 'Your project''s status has been changed from {project.oldstatus} to {project.newstatus} by the moderation team. Please review any messages left in the moderation thread which might be relevant to why the status was changed.', + CHR(10), + CHR(10), + 'View your project here: https://modrinth.com/project/{project.id}/moderation', + CHR(10), + CHR(10), + 'If you believe this status was applied in error, you can reply in the moderation thread or contact support through our Support Portal or by replying to this email.', + CHR(10), + CHR(10), + 'Thank you for publishing on Modrinth!' + ) + ); + +INSERT INTO notifications_templates + (channel, notification_type, subject_line, body_fetch_url, plaintext_fallback) +VALUES + ( + 'email', + 'project_transferred', + 'Project ownership transferred', + 'https://modrinth.com/email/project-ownership-transferred', + CONCAT( + 'Hi {user.name},', + CHR(10), + CHR(10), + 'The ownership of {project.name} has been successfully transferred to the Modrinth {new_owner.type_capitalized} {new_owner.name}.', + CHR(10), + CHR(10), + 'View the project here: https://modrinth.com/project/{project.id}', + CHR(10), + CHR(10), + 'If you did not initiate this transfer, please contact support immediately through the Support Portal or by replying to this email.' + ) + ); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES + (NULL, 'email', 'payout_available', TRUE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES + (NULL, 'email', 'pat_created', TRUE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES + (NULL, 'email', 'moderation_message_received', TRUE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES + (NULL, 'email', 'report_status_updated', TRUE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES + (NULL, 'email', 'report_submitted', TRUE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES + (NULL, 'email', 'project_status_approved', TRUE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES + (NULL, 'email', 'project_status_neutral', TRUE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES + (NULL, 'email', 'project_transferred', TRUE); diff --git a/apps/labrinth/src/background_task.rs b/apps/labrinth/src/background_task.rs index c79d3d6e..2da7ba9b 100644 --- a/apps/labrinth/src/background_task.rs +++ b/apps/labrinth/src/background_task.rs @@ -1,7 +1,8 @@ use crate::database::redis::RedisPool; use crate::queue::email::EmailQueue; use crate::queue::payouts::{ - PayoutsQueue, insert_bank_balances_and_webhook, process_payout, + PayoutsQueue, index_payouts_notifications, + insert_bank_balances_and_webhook, process_payout, }; use crate::search::indexing::index_projects; use crate::{database, search}; @@ -38,7 +39,7 @@ impl BackgroundTask { IndexSearch => index_search(pool, redis_pool, search_config).await, ReleaseScheduled => release_scheduled(pool).await, UpdateVersions => update_versions(pool, redis_pool).await, - Payouts => payouts(pool, clickhouse).await, + Payouts => payouts(pool, clickhouse, redis_pool).await, IndexBilling => { crate::routes::internal::billing::index_billing( stripe_client, @@ -147,12 +148,19 @@ pub async fn update_versions( pub async fn payouts( pool: sqlx::Pool, clickhouse: clickhouse::Client, + redis_pool: RedisPool, ) { info!("Started running payouts"); let result = process_payout(&pool, &clickhouse).await; if let Err(e) = result { warn!("Payouts run failed: {:?}", e); } + + let result = index_payouts_notifications(&pool, &redis_pool).await; + if let Err(e) = result { + warn!("Payouts notifications indexing failed: {:?}", e); + } + info!("Done running payouts"); } diff --git a/apps/labrinth/src/database/models/ids.rs b/apps/labrinth/src/database/models/ids.rs index 795862ce..559faf58 100644 --- a/apps/labrinth/src/database/models/ids.rs +++ b/apps/labrinth/src/database/models/ids.rs @@ -60,12 +60,15 @@ macro_rules! generate_bulk_ids { count: usize, con: &mut sqlx::Transaction<'_, sqlx::Postgres>, ) -> Result, DatabaseError> { - let mut rng = rand::thread_rng(); let mut retry_count = 0; // Check if ID is unique loop { - let base = random_base62_rng_range(&mut rng, 1, 10) as i64; + // We re-acquire a thread-local RNG handle for each uniqueness loop for + // the bulk generator future to be `Send + Sync`. + let base = + random_base62_rng_range(&mut rand::thread_rng(), 1, 10) + as i64; let ids = (0..count).map(|x| base + x as i64).collect::>(); diff --git a/apps/labrinth/src/database/models/mod.rs b/apps/labrinth/src/database/models/mod.rs index 7c5b5e60..925e4ac5 100644 --- a/apps/labrinth/src/database/models/mod.rs +++ b/apps/labrinth/src/database/models/mod.rs @@ -19,6 +19,7 @@ pub mod oauth_token_item; pub mod organization_item; pub mod pat_item; pub mod payout_item; +pub mod payouts_values_notifications; pub mod product_item; pub mod project_item; pub mod report_item; diff --git a/apps/labrinth/src/database/models/notification_item.rs b/apps/labrinth/src/database/models/notification_item.rs index eb90b4e9..deabbdf0 100644 --- a/apps/labrinth/src/database/models/notification_item.rs +++ b/apps/labrinth/src/database/models/notification_item.rs @@ -2,6 +2,7 @@ use super::ids::*; use crate::database::{models::DatabaseError, redis::RedisPool}; use crate::models::notifications::{ NotificationBody, NotificationChannel, NotificationDeliveryStatus, + NotificationType, }; use chrono::{DateTime, Utc}; use futures::TryStreamExt; @@ -41,6 +42,71 @@ impl NotificationBuilder { self.insert_many(vec![user], transaction, redis).await } + pub async fn insert_many_payout_notifications( + users: Vec, + dates_available: Vec>, + transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>, + redis: &RedisPool, + ) -> Result<(), DatabaseError> { + let notification_ids = + generate_many_notification_ids(users.len(), &mut *transaction) + .await?; + + let users_raw_ids = users.iter().map(|x| x.0).collect::>(); + let notification_ids = + notification_ids.iter().map(|x| x.0).collect::>(); + + sqlx::query!( + " + WITH + period_payouts AS ( + SELECT + ids.notification_id, + ids.user_id, + ids.date_available, + COALESCE(SUM(pv.amount), 0.0) sum + FROM UNNEST($1::bigint[], $2::bigint[], $3::timestamptz[]) AS ids(notification_id, user_id, date_available) + LEFT JOIN payouts_values pv ON pv.user_id = ids.user_id AND pv.date_available = ids.date_available + GROUP BY ids.user_id, ids.notification_id, ids.date_available + ) + INSERT INTO notifications ( + id, user_id, body + ) + SELECT + notification_id id, + user_id, + JSONB_BUILD_OBJECT( + 'type', 'payout_available', + 'date_available', to_jsonb(date_available), + 'amount', to_jsonb(sum) + ) body + FROM period_payouts + ", + ¬ification_ids[..], + &users_raw_ids[..], + &dates_available[..], + ) + .execute(&mut **transaction) + .await?; + + let notification_types = notification_ids + .iter() + .map(|_| NotificationType::PayoutAvailable.as_str()) + .collect::>(); + + NotificationBuilder::insert_many_deliveries( + transaction, + redis, + ¬ification_ids, + &users_raw_ids, + ¬ification_types, + &users, + ) + .await?; + + Ok(()) + } + pub async fn insert_many( &self, users: Vec, @@ -80,6 +146,27 @@ impl NotificationBuilder { .map(|_| self.body.notification_type().as_str()) .collect::>(); + NotificationBuilder::insert_many_deliveries( + transaction, + redis, + ¬ification_ids, + &users_raw_ids, + ¬ification_types, + &users, + ) + .await?; + + Ok(()) + } + + async fn insert_many_deliveries( + transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>, + redis: &RedisPool, + notification_ids: &[i64], + users_raw_ids: &[i64], + notification_types: &[&str], + users: &[DBUserId], + ) -> Result<(), DatabaseError> { let notification_channels = NotificationChannel::list() .iter() .map(|x| x.as_str()) @@ -159,7 +246,7 @@ impl NotificationBuilder { query.execute(&mut **transaction).await?; - DBNotification::clear_user_notifications_cache(&users, redis).await?; + DBNotification::clear_user_notifications_cache(users, redis).await?; Ok(()) } diff --git a/apps/labrinth/src/database/models/payouts_values_notifications.rs b/apps/labrinth/src/database/models/payouts_values_notifications.rs new file mode 100644 index 00000000..0483d2ab --- /dev/null +++ b/apps/labrinth/src/database/models/payouts_values_notifications.rs @@ -0,0 +1,89 @@ +use crate::database::models::{DBUserId, DatabaseError}; +use chrono::{DateTime, Utc}; + +pub struct PayoutsValuesNotification { + pub id: i32, + pub user_id: DBUserId, + pub date_available: DateTime, +} + +impl PayoutsValuesNotification { + pub async fn unnotified_users_with_available_payouts_with_limit( + exec: impl sqlx::PgExecutor<'_>, + limit: i64, + ) -> Result, DatabaseError> { + Ok(sqlx::query_as!( + QueryResult, + " + SELECT + id, + user_id, + date_available + FROM payouts_values_notifications + WHERE + notified = FALSE + AND date_available <= NOW() + FOR UPDATE SKIP LOCKED + LIMIT $1 + ", + limit, + ) + .fetch_all(exec) + .await? + .into_iter() + .map(Into::into) + .collect()) + } + + pub async fn set_notified_many( + ids: &[i32], + exec: impl sqlx::PgExecutor<'_>, + ) -> Result<(), DatabaseError> { + sqlx::query!( + " + UPDATE payouts_values_notifications + SET notified = TRUE + WHERE id = ANY($1) + ", + &ids[..], + ) + .execute(exec) + .await?; + + Ok(()) + } +} + +pub async fn synchronize_future_payout_values( + exec: impl sqlx::PgExecutor<'_>, +) -> Result<(), DatabaseError> { + sqlx::query!( + " + INSERT INTO payouts_values_notifications (date_available, user_id, notified) + SELECT DISTINCT date_available, user_id, false notified + FROM payouts_values + WHERE date_available > NOW() + ON CONFLICT (date_available, user_id) DO NOTHING + ", + ) + .execute(exec) + .await?; + + Ok(()) +} + +struct QueryResult { + id: i32, + user_id: i64, + date_available: DateTime, +} + +impl From for PayoutsValuesNotification { + fn from(result: QueryResult) -> Self { + PayoutsValuesNotification { + id: result.id, + user_id: DBUserId(result.user_id), + date_available: result.date_available, + } + } +} diff --git a/apps/labrinth/src/lib.rs b/apps/labrinth/src/lib.rs index dcdbaf9e..f58c5aaf 100644 --- a/apps/labrinth/src/lib.rs +++ b/apps/labrinth/src/lib.rs @@ -148,11 +148,13 @@ pub fn app_setup( let pool_ref = pool.clone(); let client_ref = clickhouse.clone(); + let redis_pool_ref = redis_pool.clone(); scheduler.run(Duration::from_secs(60 * 60 * 6), move || { let pool_ref = pool_ref.clone(); let client_ref = client_ref.clone(); + let redis_ref = redis_pool_ref.clone(); async move { - background_task::payouts(pool_ref, client_ref).await; + background_task::payouts(pool_ref, client_ref, redis_ref).await; } }); diff --git a/apps/labrinth/src/models/v2/notifications.rs b/apps/labrinth/src/models/v2/notifications.rs index 16a94137..664cf4b0 100644 --- a/apps/labrinth/src/models/v2/notifications.rs +++ b/apps/labrinth/src/models/v2/notifications.rs @@ -96,6 +96,36 @@ pub enum LegacyNotificationBody { amount: String, service: String, }, + PatCreated { + token_name: String, + }, + ModerationMessageReceived { + project_id: ProjectId, + }, + ReportStatusUpdated { + report_id: ReportId, + }, + ReportSubmitted { + report_id: ReportId, + }, + ProjectStatusApproved { + project_id: ProjectId, + }, + ProjectStatusNeutral { + project_id: ProjectId, + old_status: ProjectStatus, + new_status: ProjectStatus, + }, + ProjectTransferred { + project_id: ProjectId, + // Store only the raw identifiers in legacy body + new_owner_user_id: Option, + new_owner_organization_id: Option, + }, + PayoutAvailable { + amount: f64, + date_available: DateTime, + }, Unknown, } @@ -117,6 +147,27 @@ impl LegacyNotification { NotificationBody::ModeratorMessage { .. } => { Some("moderator_message".to_string()) } + NotificationBody::PatCreated { .. } => { + Some("pat_created".to_string()) + } + NotificationBody::ModerationMessageReceived { .. } => { + Some("moderation_message_received".to_string()) + } + NotificationBody::ReportStatusUpdated { .. } => { + Some("report_status_updated".to_string()) + } + NotificationBody::ReportSubmitted { .. } => { + Some("report_submitted".to_string()) + } + NotificationBody::ProjectStatusApproved { .. } => { + Some("project_status_approved".to_string()) + } + NotificationBody::ProjectStatusNeutral { .. } => { + Some("project_status_neutral".to_string()) + } + NotificationBody::ProjectTransferred { .. } => { + Some("project_transferred".to_string()) + } NotificationBody::ResetPassword { .. } => { Some("reset_password".to_string()) } @@ -147,6 +198,9 @@ impl LegacyNotification { NotificationBody::PaymentFailed { .. } => { Some("payment_failed".to_string()) } + NotificationBody::PayoutAvailable { .. } => { + Some("payout_available".to_string()) + } NotificationBody::LegacyMarkdown { notification_type, .. } => notification_type.clone(), @@ -203,6 +257,46 @@ impl LegacyNotification { project_id, report_id, }, + NotificationBody::PatCreated { token_name } => { + LegacyNotificationBody::PatCreated { token_name } + } + NotificationBody::ModerationMessageReceived { project_id } => { + LegacyNotificationBody::ModerationMessageReceived { project_id } + } + NotificationBody::ReportStatusUpdated { report_id } => { + LegacyNotificationBody::ReportStatusUpdated { report_id } + } + NotificationBody::ReportSubmitted { report_id } => { + LegacyNotificationBody::ReportSubmitted { report_id } + } + NotificationBody::ProjectStatusApproved { project_id } => { + LegacyNotificationBody::ProjectStatusApproved { project_id } + } + NotificationBody::ProjectStatusNeutral { + project_id, + old_status, + new_status, + } => LegacyNotificationBody::ProjectStatusNeutral { + project_id, + old_status, + new_status, + }, + NotificationBody::ProjectTransferred { + project_id, + new_owner_user_id, + new_owner_organization_id, + } => LegacyNotificationBody::ProjectTransferred { + project_id, + new_owner_user_id, + new_owner_organization_id, + }, + NotificationBody::PayoutAvailable { + amount, + date_available, + } => LegacyNotificationBody::PayoutAvailable { + amount, + date_available, + }, NotificationBody::LegacyMarkdown { notification_type, name, diff --git a/apps/labrinth/src/models/v3/notifications.rs b/apps/labrinth/src/models/v3/notifications.rs index 045be3cf..b74ab8c5 100644 --- a/apps/labrinth/src/models/v3/notifications.rs +++ b/apps/labrinth/src/models/v3/notifications.rs @@ -46,6 +46,14 @@ pub enum NotificationType { PasswordRemoved, EmailChanged, PaymentFailed, + PatCreated, + ModerationMessageReceived, + ReportStatusUpdated, + ReportSubmitted, + ProjectStatusApproved, + ProjectStatusNeutral, + ProjectTransferred, + PayoutAvailable, Unknown, } @@ -68,6 +76,18 @@ impl NotificationType { NotificationType::PasswordRemoved => "password_removed", NotificationType::EmailChanged => "email_changed", NotificationType::PaymentFailed => "payment_failed", + NotificationType::PatCreated => "pat_created", + NotificationType::ModerationMessageReceived => { + "moderation_message_received" + } + NotificationType::ReportStatusUpdated => "report_status_updated", + NotificationType::ReportSubmitted => "report_submitted", + NotificationType::ProjectStatusApproved => { + "project_status_approved" + } + NotificationType::ProjectStatusNeutral => "project_status_neutral", + NotificationType::ProjectTransferred => "project_transferred", + NotificationType::PayoutAvailable => "payout_available", NotificationType::Unknown => "unknown", } } @@ -90,6 +110,18 @@ impl NotificationType { "password_removed" => NotificationType::PasswordRemoved, "email_changed" => NotificationType::EmailChanged, "payment_failed" => NotificationType::PaymentFailed, + "pat_created" => NotificationType::PatCreated, + "moderation_message_received" => { + NotificationType::ModerationMessageReceived + } + "report_status_updated" => NotificationType::ReportStatusUpdated, + "report_submitted" => NotificationType::ReportSubmitted, + "project_status_approved" => { + NotificationType::ProjectStatusApproved + } + "project_status_neutral" => NotificationType::ProjectStatusNeutral, + "project_transferred" => NotificationType::ProjectTransferred, + "payout_available" => NotificationType::PayoutAvailable, "unknown" => NotificationType::Unknown, _ => NotificationType::Unknown, } @@ -120,6 +152,7 @@ pub enum NotificationBody { old_status: ProjectStatus, new_status: ProjectStatus, }, + /// This is for website notifications only. Email notifications have `ModerationMessageReceived`. ModeratorMessage { thread_id: ThreadId, message_id: ThreadMessageId, @@ -127,6 +160,33 @@ pub enum NotificationBody { project_id: Option, report_id: Option, }, + PatCreated { + token_name: String, + }, + /// This differs from ModeratorMessage as this notification is only for project threads and + /// email notifications, not for site notifications. + ModerationMessageReceived { + project_id: ProjectId, + }, + ReportStatusUpdated { + report_id: ReportId, + }, + ReportSubmitted { + report_id: ReportId, + }, + ProjectStatusApproved { + project_id: ProjectId, + }, + ProjectStatusNeutral { + project_id: ProjectId, + old_status: ProjectStatus, + new_status: ProjectStatus, + }, + ProjectTransferred { + project_id: ProjectId, + new_owner_user_id: Option, + new_owner_organization_id: Option, + }, LegacyMarkdown { notification_type: Option, name: String, @@ -158,6 +218,10 @@ pub enum NotificationBody { amount: String, service: String, }, + PayoutAvailable { + date_available: DateTime, + amount: f64, + }, Unknown, } @@ -177,6 +241,25 @@ impl NotificationBody { NotificationBody::ModeratorMessage { .. } => { NotificationType::ModeratorMessage } + NotificationBody::PatCreated { .. } => NotificationType::PatCreated, + NotificationBody::ModerationMessageReceived { .. } => { + NotificationType::ModerationMessageReceived + } + NotificationBody::ReportStatusUpdated { .. } => { + NotificationType::ReportStatusUpdated + } + NotificationBody::ReportSubmitted { .. } => { + NotificationType::ReportSubmitted + } + NotificationBody::ProjectStatusApproved { .. } => { + NotificationType::ProjectStatusApproved + } + NotificationBody::ProjectStatusNeutral { .. } => { + NotificationType::ProjectStatusNeutral + } + NotificationBody::ProjectTransferred { .. } => { + NotificationType::ProjectTransferred + } NotificationBody::LegacyMarkdown { .. } => { NotificationType::LegacyMarkdown } @@ -210,6 +293,9 @@ impl NotificationBody { NotificationBody::PaymentFailed { .. } => { NotificationType::PaymentFailed } + NotificationBody::PayoutAvailable { .. } => { + NotificationType::PayoutAvailable + } NotificationBody::Unknown => NotificationType::Unknown, } } @@ -323,6 +409,46 @@ impl From for Notification { }, vec![], ), + // The notifications from here to down below are listed with messages for completeness' sake, + // though they should never be sent via site notifications. This should be disabled via database + // options. Messages should be reviewed and worded better if we want to distribute these notifications + // via the site. + NotificationBody::PatCreated { token_name } => ( + "New personal access token created".to_string(), + format!("Your personal access token '{token_name}' was created."), + "#".to_string(), + vec![], + ), + NotificationBody::ReportStatusUpdated { .. } => ( + "Report status updated".to_string(), + "A report you are involved in has been updated.".to_string(), + "#".to_string(), + vec![], + ), + NotificationBody::ReportSubmitted { .. } => ( + "Report submitted".to_string(), + "Your report was submitted successfully.".to_string(), + "#".to_string(), + vec![], + ), + NotificationBody::ProjectStatusApproved { .. } => ( + "Project approved".to_string(), + "Your project has been approved.".to_string(), + "#".to_string(), + vec![], + ), + NotificationBody::ProjectStatusNeutral { .. } => ( + "Project status updated".to_string(), + "Your project status has been updated.".to_string(), + "#".to_string(), + vec![], + ), + NotificationBody::ProjectTransferred { .. } => ( + "Project ownership transferred".to_string(), + "A project's ownership has been transferred.".to_string(), + "#".to_string(), + vec![], + ), // Don't expose the `flow` field NotificationBody::ResetPassword { .. } => ( "Password reset requested".to_string(), @@ -342,10 +468,6 @@ impl From for Notification { link.clone(), actions.clone().into_iter().collect(), ), - // The notifications from here to down below are listed with messages for completeness' sake, - // though they should never be sent via site notifications. This should be disabled via database - // options. Messages should be reviewed and worded better if we want to distribute these notifications - // via the site. NotificationBody::PaymentFailed { .. } => ( "Payment failed".to_string(), "A payment on your account failed. Please update your billing information.".to_string(), @@ -400,6 +522,18 @@ impl From for Notification { "#".to_string(), vec![], ), + NotificationBody::PayoutAvailable { .. } => ( + "Payout available".to_string(), + "A payout is available!".to_string(), + "#".to_string(), + vec![], + ), + NotificationBody::ModerationMessageReceived { .. } => ( + "New message in moderation thread".to_string(), + "You have a new message in a moderation thread.".to_string(), + "#".to_string(), + vec![], + ), NotificationBody::Unknown => { ("".to_string(), "".to_string(), "#".to_string(), vec![]) } diff --git a/apps/labrinth/src/queue/email.rs b/apps/labrinth/src/queue/email.rs index ddaeeacf..e249af0c 100644 --- a/apps/labrinth/src/queue/email.rs +++ b/apps/labrinth/src/queue/email.rs @@ -307,7 +307,7 @@ impl EmailQueue { }; let message = templates::build_email( - &mut **txn, + txn, &self.redis, &self.client, user_id, diff --git a/apps/labrinth/src/queue/email/templates.rs b/apps/labrinth/src/queue/email/templates.rs index 4e269e4d..9c824da1 100644 --- a/apps/labrinth/src/queue/email/templates.rs +++ b/apps/labrinth/src/queue/email/templates.rs @@ -1,11 +1,13 @@ use super::MailError; -use crate::database::models::DBUser; -use crate::database::models::DatabaseError; use crate::database::models::ids::*; use crate::database::models::notifications_template_item::NotificationTemplate; +use crate::database::models::{ + DBOrganization, DBProject, DBUser, DatabaseError, +}; use crate::database::redis::RedisPool; use crate::models::v3::notifications::NotificationBody; use crate::routes::ApiError; +use ariadne::ids::base62_impl::to_base62; use futures::TryFutureExt; use lettre::Message; use lettre::message::{Mailbox, MultiPart, SinglePart}; @@ -38,6 +40,27 @@ const STATUSCHANGE_PROJECT_NAME: &str = "statuschange.project.name"; const STATUSCHANGE_OLD_STATUS: &str = "statuschange.old.status"; const STATUSCHANGE_NEW_STATUS: &str = "statuschange.new.status"; +const NEWPAT_TOKEN_NAME: &str = "newpat.token_name"; + +const PROJECT_ID: &str = "project.id"; +const PROJECT_NAME: &str = "project.name"; +const PROJECT_ICON_URL: &str = "project.icon_url"; + +const REPORT_ID: &str = "report.id"; +const REPORT_TITLE: &str = "report.title"; +const REPORT_DATE: &str = "report.date"; +const NEWREPORT_ID: &str = "newreport.id"; + +const PROJECT_OLD_STATUS: &str = "project.oldstatus"; +const PROJECT_NEW_STATUS: &str = "project.newstatus"; + +const NEWOWNER_TYPE: &str = "new_owner.type"; +const NEWOWNER_TYPE_CAPITALIZED: &str = "new_owner.type_capitalized"; +const NEWOWNER_NAME: &str = "new_owner.name"; + +const PAYOUTAVAILABLE_AMOUNT: &str = "payout.amount"; +const PAYOUTAVAILABLE_PERIOD: &str = "payout.period"; + #[derive(Clone)] pub struct MailingIdentity { from_name: String, @@ -59,7 +82,7 @@ impl MailingIdentity { #[allow(clippy::too_many_arguments)] pub async fn build_email( - exec: impl sqlx::PgExecutor<'_>, + exec: &mut sqlx::PgTransaction<'_>, redis: &RedisPool, client: &reqwest::Client, user_id: DBUserId, @@ -181,27 +204,177 @@ fn fill_template( } async fn collect_template_variables( - exec: impl sqlx::PgExecutor<'_>, + exec: &mut sqlx::PgTransaction<'_>, redis: &RedisPool, user_id: DBUserId, n: &NotificationBody, ) -> Result, ApiError> { - async fn only_select_default_variables( - exec: impl sqlx::PgExecutor<'_>, - redis: &RedisPool, - user_id: DBUserId, - ) -> Result, ApiError> { - let mut map = HashMap::new(); + let db_user = DBUser::get_id(user_id, &mut **exec, redis) + .await? + .ok_or_else(|| DatabaseError::Database(sqlx::Error::RowNotFound))?; - let user = DBUser::get_id(user_id, exec, redis) - .await? - .ok_or_else(|| DatabaseError::Database(sqlx::Error::RowNotFound))?; - - map.insert(USER_NAME, user.username); - Ok(map) - } + let mut map = HashMap::new(); + map.insert(USER_NAME, db_user.username); match &n { + NotificationBody::PatCreated { token_name } => { + map.insert(NEWPAT_TOKEN_NAME, token_name.clone()); + Ok(map) + } + + NotificationBody::ModerationMessageReceived { project_id, .. } => { + let result = DBProject::get_id( + DBProjectId(project_id.0 as i64), + exec, + redis, + ) + .await? + .ok_or_else(|| DatabaseError::Database(sqlx::Error::RowNotFound))? + .inner; + + map.insert(PROJECT_ID, to_base62(project_id.0)); + map.insert(PROJECT_NAME, result.name); + map.insert(PROJECT_ICON_URL, result.icon_url.unwrap_or_default()); + Ok(map) + } + + NotificationBody::ReportStatusUpdated { report_id } => { + let result = query!( + r#" + SELECT + r.created, + COALESCE(m.name, v.version_number, u.username, 'unknown') "title!" + FROM reports r + LEFT JOIN mods m ON r.mod_id = m.id + LEFT JOIN versions v ON r.version_id = v.id + LEFT JOIN users u ON r.user_id = u.id + WHERE r.id = $1 + "#, + report_id.0 as i64 + ) + .fetch_one(&mut **exec) + .await?; + + map.insert(REPORT_ID, to_base62(report_id.0)); + map.insert(REPORT_TITLE, result.title); + map.insert(REPORT_DATE, date_human_readable(result.created)); + Ok(map) + } + + NotificationBody::ReportSubmitted { report_id } => { + let result = query!( + r#" + SELECT + COALESCE(m.name, v.version_number, u.username, 'unknown') "title!" + FROM reports r + LEFT JOIN mods m ON r.mod_id = m.id + LEFT JOIN versions v ON r.version_id = v.id + LEFT JOIN users u ON r.user_id = u.id + WHERE r.id = $1 + "#, + report_id.0 as i64 + ) + .fetch_one(&mut **exec) + .await?; + + map.insert(REPORT_TITLE, result.title); + map.insert(NEWREPORT_ID, to_base62(report_id.0)); + Ok(map) + } + + NotificationBody::ProjectStatusApproved { project_id } => { + let result = query!( + r#" + SELECT name, icon_url FROM mods WHERE id = $1 + "#, + project_id.0 as i64 + ) + .fetch_one(&mut **exec) + .await?; + + map.insert(PROJECT_ID, to_base62(project_id.0)); + map.insert(PROJECT_NAME, result.name); + map.insert(PROJECT_ICON_URL, result.icon_url.unwrap_or_default()); + Ok(map) + } + + NotificationBody::ProjectStatusNeutral { + project_id, + old_status, + new_status, + } => { + let result = DBProject::get_id( + DBProjectId(project_id.0 as i64), + exec, + redis, + ) + .await? + .ok_or_else(|| DatabaseError::Database(sqlx::Error::RowNotFound))? + .inner; + + map.insert(PROJECT_ID, to_base62(project_id.0)); + map.insert(PROJECT_NAME, result.name); + map.insert(PROJECT_ICON_URL, result.icon_url.unwrap_or_default()); + map.insert(PROJECT_OLD_STATUS, old_status.as_str().to_string()); + map.insert(PROJECT_NEW_STATUS, new_status.as_str().to_string()); + Ok(map) + } + + NotificationBody::ProjectTransferred { + project_id, + new_owner_user_id, + new_owner_organization_id, + } => { + let project = DBProject::get_id( + DBProjectId(project_id.0 as i64), + &mut **exec, + redis, + ) + .await? + .ok_or_else(|| DatabaseError::Database(sqlx::Error::RowNotFound))? + .inner; + + map.insert(PROJECT_ID, to_base62(project_id.0)); + map.insert(PROJECT_NAME, project.name); + map.insert(PROJECT_ICON_URL, project.icon_url.unwrap_or_default()); + + if let Some(new_owner_user_id) = new_owner_user_id { + let user = DBUser::get_id( + DBUserId(new_owner_user_id.0 as i64), + &mut **exec, + redis, + ) + .await? + .ok_or_else(|| { + DatabaseError::Database(sqlx::Error::RowNotFound) + })?; + + map.insert(NEWOWNER_TYPE, "user".to_string()); + map.insert(NEWOWNER_TYPE_CAPITALIZED, "User".to_string()); + map.insert(NEWOWNER_NAME, user.username); + } else if let Some(new_owner_organization_id) = + new_owner_organization_id + { + let org = DBOrganization::get_id( + DBOrganizationId(new_owner_organization_id.0 as i64), + &mut **exec, + redis, + ) + .await? + .ok_or_else(|| { + DatabaseError::Database(sqlx::Error::RowNotFound) + })?; + + map.insert(NEWOWNER_TYPE, "organization".to_string()); + map.insert( + NEWOWNER_TYPE_CAPITALIZED, + "Organization".to_string(), + ); + map.insert(NEWOWNER_NAME, org.name); + } + + Ok(map) + } NotificationBody::TeamInvite { team_id: _, project_id, @@ -224,11 +397,9 @@ async fn collect_template_variables( project_id.0 as i64, user_id.0 as i64 ) - .fetch_one(exec) + .fetch_one(&mut **exec) .await?; - let mut map = HashMap::new(); - map.insert(USER_NAME, result.user_name); map.insert(TEAMINVITE_INVITER_NAME, result.inviter_name); map.insert(TEAMINVITE_PROJECT_NAME, result.project_name); map.insert(TEAMINVITE_ROLE_NAME, role.clone()); @@ -258,11 +429,9 @@ async fn collect_template_variables( organization_id.0 as i64, user_id.0 as i64 ) - .fetch_one(exec) + .fetch_one(&mut **exec) .await?; - let mut map = HashMap::new(); - map.insert(USER_NAME, result.user_name); map.insert(ORGINVITE_INVITER_NAME, result.inviter_name); map.insert(ORGINVITE_ORG_NAME, result.organization_name); map.insert(ORGINVITE_ROLE_NAME, role.clone()); @@ -288,11 +457,9 @@ async fn collect_template_variables( project_id.0 as i64, user_id.0 as i64, ) - .fetch_one(exec) + .fetch_one(&mut **exec) .await?; - let mut map = HashMap::new(); - map.insert(USER_NAME, result.user_name); map.insert(STATUSCHANGE_PROJECT_NAME, result.project_name); map.insert(STATUSCHANGE_OLD_STATUS, old_status.as_str().to_owned()); map.insert(STATUSCHANGE_NEW_STATUS, new_status.as_str().to_owned()); @@ -308,13 +475,7 @@ async fn collect_template_variables( flow ); - let user = DBUser::get_id(user_id, exec, redis).await?.ok_or_else( - || DatabaseError::Database(sqlx::Error::RowNotFound), - )?; - - let mut map = HashMap::new(); map.insert(RESETPASSWORD_URL, url); - map.insert(USER_NAME, user.username); Ok(map) } @@ -327,25 +488,13 @@ async fn collect_template_variables( flow ); - let user = DBUser::get_id(user_id, exec, redis).await?.ok_or_else( - || DatabaseError::Database(sqlx::Error::RowNotFound), - )?; - - let mut map = HashMap::new(); map.insert(VERIFYEMAIL_URL, url); - map.insert(USER_NAME, user.username); Ok(map) } NotificationBody::AuthProviderAdded { provider } | NotificationBody::AuthProviderRemoved { provider } => { - let user = DBUser::get_id(user_id, exec, redis).await?.ok_or_else( - || DatabaseError::Database(sqlx::Error::RowNotFound), - )?; - - let mut map = HashMap::new(); - map.insert(USER_NAME, user.username); map.insert(AUTHPROVIDER_NAME, provider.clone()); Ok(map) @@ -354,30 +503,18 @@ async fn collect_template_variables( NotificationBody::TwoFactorEnabled | NotificationBody::TwoFactorRemoved | NotificationBody::PasswordChanged - | NotificationBody::PasswordRemoved => { - only_select_default_variables(exec, redis, user_id).await - } + | NotificationBody::PasswordRemoved => Ok(map), NotificationBody::EmailChanged { new_email, to_email: _, } => { - let user = DBUser::get_id(user_id, exec, redis).await?.ok_or_else( - || DatabaseError::Database(sqlx::Error::RowNotFound), - )?; - - let mut map = HashMap::new(); - map.insert(USER_NAME, user.username); map.insert(EMAILCHANGED_NEW_EMAIL, new_email.clone()); Ok(map) } NotificationBody::PaymentFailed { amount, service } => { - let user = DBUser::get_id(user_id, exec, redis).await?.ok_or_else( - || DatabaseError::Database(sqlx::Error::RowNotFound), - )?; - let url = format!( "{}/{}", dotenvy::var("SITE_URL")?, @@ -385,7 +522,6 @@ async fn collect_template_variables( ); let mut map = HashMap::new(); - map.insert(USER_NAME, user.username); map.insert(PAYMENTFAILED_AMOUNT, amount.clone()); map.insert(PAYMENTFAILED_SERVICE, service.clone()); map.insert(BILLING_URL, url); @@ -393,11 +529,34 @@ async fn collect_template_variables( Ok(map) } - NotificationBody::ProjectUpdate { .. } - | NotificationBody::LegacyMarkdown { .. } - | NotificationBody::ModeratorMessage { .. } - | NotificationBody::Unknown => { - only_select_default_variables(exec, redis, user_id).await + NotificationBody::PayoutAvailable { + amount, + date_available, + } => { + if let Some(period_month) = + date_available.checked_sub_months(chrono::Months::new(2)) + { + map.insert( + PAYOUTAVAILABLE_PERIOD, + period_month.format("%B %Y").to_string(), + ); + } + + map.insert( + PAYOUTAVAILABLE_AMOUNT, + format!("{:.2}", (amount * 100.0) as i64), + ); + + Ok(map) } + + NotificationBody::ProjectUpdate { .. } + | NotificationBody::ModeratorMessage { .. } + | NotificationBody::LegacyMarkdown { .. } + | NotificationBody::Unknown => Ok(map), } } + +fn date_human_readable(date: chrono::DateTime) -> String { + date.format("%B %d, %Y").to_string() +} diff --git a/apps/labrinth/src/queue/moderation.rs b/apps/labrinth/src/queue/moderation.rs index 010e70f3..95953282 100644 --- a/apps/labrinth/src/queue/moderation.rs +++ b/apps/labrinth/src/queue/moderation.rs @@ -711,7 +711,19 @@ impl AutomatedModerationQueue { }, } .insert_many( - members.into_iter().map(|x| x.user_id).collect(), + members.iter().map(|x| x.user_id).collect(), + &mut transaction, + &redis, + ) + .await?; + + NotificationBuilder { + body: NotificationBody::ModerationMessageReceived { + project_id: project.inner.id.into(), + }, + } + .insert_many( + members.iter().map(|x| x.user_id).collect(), &mut transaction, &redis, ) diff --git a/apps/labrinth/src/queue/payouts.rs b/apps/labrinth/src/queue/payouts.rs index 90ebc5f5..86704e12 100644 --- a/apps/labrinth/src/queue/payouts.rs +++ b/apps/labrinth/src/queue/payouts.rs @@ -1,3 +1,6 @@ +use crate::database::models::notification_item::NotificationBuilder; +use crate::database::models::payouts_values_notifications; +use crate::database::redis::RedisPool; use crate::models::payouts::{ PayoutDecimal, PayoutInterval, PayoutMethod, PayoutMethodFee, PayoutMethodType, @@ -1084,6 +1087,41 @@ pub async fn insert_payouts( .await } +pub async fn index_payouts_notifications( + pool: &PgPool, + redis: &RedisPool, +) -> Result<(), ApiError> { + let mut transaction = pool.begin().await?; + + payouts_values_notifications::synchronize_future_payout_values( + &mut *transaction, + ) + .await?; + let items = payouts_values_notifications::PayoutsValuesNotification::unnotified_users_with_available_payouts_with_limit(&mut *transaction, 200).await?; + + let payout_ref_ids = items.iter().map(|x| x.id).collect::>(); + let dates_available = + items.iter().map(|x| x.date_available).collect::>(); + let user_ids = items.iter().map(|x| x.user_id).collect::>(); + + NotificationBuilder::insert_many_payout_notifications( + user_ids, + dates_available, + &mut transaction, + redis, + ) + .await?; + payouts_values_notifications::PayoutsValuesNotification::set_notified_many( + &payout_ref_ids, + &mut *transaction, + ) + .await?; + + transaction.commit().await?; + + Ok(()) +} + pub async fn insert_bank_balances_and_webhook( payouts: &PayoutsQueue, pool: &PgPool, diff --git a/apps/labrinth/src/routes/internal/pats.rs b/apps/labrinth/src/routes/internal/pats.rs index fa10d97c..7070c72b 100644 --- a/apps/labrinth/src/routes/internal/pats.rs +++ b/apps/labrinth/src/routes/internal/pats.rs @@ -13,6 +13,8 @@ use rand::distributions::Alphanumeric; use rand_chacha::ChaCha20Rng; use rand_chacha::rand_core::SeedableRng; +use crate::database::models::notification_item::NotificationBuilder; +use crate::models::notifications::NotificationBody; use crate::models::pats::{PersonalAccessToken, Scopes}; use crate::queue::session::AuthQueue; use crate::util::validate::validation_errors_to_string; @@ -129,7 +131,15 @@ pub async fn create_pat( .insert(&mut transaction) .await?; + NotificationBuilder { + body: NotificationBody::PatCreated { + token_name: name.clone(), + }, + } + .insert(user.id.into(), &mut transaction, &redis) + .await?; transaction.commit().await?; + database::models::pat_item::DBPersonalAccessToken::clear_cache( vec![(None, None, Some(user.id.into()))], &redis, diff --git a/apps/labrinth/src/routes/v3/projects.rs b/apps/labrinth/src/routes/v3/projects.rs index f8012895..43e67dff 100644 --- a/apps/labrinth/src/routes/v3/projects.rs +++ b/apps/labrinth/src/routes/v3/projects.rs @@ -476,6 +476,22 @@ pub async fn project_edit( new_status: *status, }, } + .insert_many(notified_members.clone(), &mut transaction, &redis) + .await?; + + NotificationBuilder { + body: if status.is_approved() { + NotificationBody::ProjectStatusApproved { + project_id: project_item.inner.id.into(), + } + } else { + NotificationBody::ProjectStatusNeutral { + project_id: project_item.inner.id.into(), + old_status: project_item.inner.status, + new_status: *status, + } + }, + } .insert_many(notified_members, &mut transaction, &redis) .await?; } diff --git a/apps/labrinth/src/routes/v3/reports.rs b/apps/labrinth/src/routes/v3/reports.rs index ee009444..03346201 100644 --- a/apps/labrinth/src/routes/v3/reports.rs +++ b/apps/labrinth/src/routes/v3/reports.rs @@ -1,6 +1,7 @@ use crate::auth::{check_is_moderator_from_headers, get_user_from_headers}; use crate::database; use crate::database::models::image_item; +use crate::database::models::notification_item::NotificationBuilder; use crate::database::models::thread_item::{ ThreadBuilder, ThreadMessageBuilder, }; @@ -8,6 +9,7 @@ use crate::database::redis::RedisPool; use crate::models::ids::ImageId; use crate::models::ids::{ProjectId, VersionId}; use crate::models::images::{Image, ImageContext}; +use crate::models::notifications::NotificationBody; use crate::models::pats::Scopes; use crate::models::reports::{ItemType, Report}; use crate::models::threads::{MessageBody, ThreadType}; @@ -204,6 +206,15 @@ pub async fn report_create( .insert(&mut transaction) .await?; + // Notify the reporter that the report has been submitted + NotificationBuilder { + body: NotificationBody::ReportSubmitted { + report_id: id.into(), + }, + } + .insert(current_user.id.into(), &mut transaction, &redis) + .await?; + transaction.commit().await?; Ok(HttpResponse::Ok().json(Report { @@ -455,6 +466,14 @@ pub async fn report_edit( .insert(&mut transaction) .await?; + NotificationBuilder { + body: NotificationBody::ReportStatusUpdated { + report_id: id.into(), + }, + } + .insert(report.reporter, &mut transaction, &redis) + .await?; + sqlx::query!( " UPDATE reports diff --git a/apps/labrinth/src/routes/v3/teams.rs b/apps/labrinth/src/routes/v3/teams.rs index 1a6b3e16..0c8ca198 100644 --- a/apps/labrinth/src/routes/v3/teams.rs +++ b/apps/labrinth/src/routes/v3/teams.rs @@ -878,7 +878,7 @@ pub async fn transfer_ownership( // Forbid transferring ownership of a project team that is owned by an organization // These are owned by the organization owner, and must be removed from the organization first - // There shouldnt be an ownr on these projects in these cases, but just in case. + // There shouldnt be an owner on these projects in these cases, but just in case. let team_association_id = DBTeam::get_association(id.into(), &**pool).await?; if let Some(TeamAssociationId::Project(pid)) = team_association_id { @@ -1018,7 +1018,21 @@ pub async fn transfer_ownership( vec![] }; + // If this team is associated with a project, notify the new owner + if let Some(TeamAssociationId::Project(pid)) = team_association_id { + NotificationBuilder { + body: NotificationBody::ProjectTransferred { + project_id: pid.into(), + new_owner_user_id: Some(new_owner.user_id), + new_owner_organization_id: None, + }, + } + .insert(new_owner.user_id.into(), &mut transaction, &redis) + .await?; + } + transaction.commit().await?; + DBTeamMember::clear_cache(id.into(), &redis).await?; for team_id in project_teams_edited { DBTeamMember::clear_cache(team_id, &redis).await?; diff --git a/apps/labrinth/src/routes/v3/threads.rs b/apps/labrinth/src/routes/v3/threads.rs index 71d29300..1a55f7b7 100644 --- a/apps/labrinth/src/routes/v3/threads.rs +++ b/apps/labrinth/src/routes/v3/threads.rs @@ -477,7 +477,19 @@ pub async fn thread_send_message( }, } .insert_many( - members.into_iter().map(|x| x.user_id).collect(), + members.iter().map(|x| x.user_id).collect(), + &mut transaction, + &redis, + ) + .await?; + + NotificationBuilder { + body: NotificationBody::ModerationMessageReceived { + project_id: project.inner.id.into(), + }, + } + .insert_many( + members.iter().map(|x| x.user_id).collect(), &mut transaction, &redis, )