diff --git a/Cargo.lock b/Cargo.lock index 630858ee..68f3dcbd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4442,11 +4442,13 @@ version = "0.11.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5cb54db6ff7a89efac87dba5baeac57bb9ccd726b49a9b6f21fb92b3966aaf56" dependencies = [ + "async-trait", "base64 0.22.1", "chumsky", "email-encoding", "email_address", "fastrand 2.3.0", + "futures-io", "futures-util", "hostname", "httpdate", @@ -4459,6 +4461,7 @@ dependencies = [ "rustls-native-certs 0.8.1", "socket2 0.6.0", "tokio", + "tokio-rustls 0.26.2", "url", ] diff --git a/Cargo.toml b/Cargo.toml index 07485bb3..5f0a461f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,7 +87,9 @@ lettre = { version = "0.11.18", default-features = false, features = [ "ring", "rustls", "rustls-native-certs", + "tokio1-rustls", "smtp-transport", + "tokio1", ] } maxminddb = "0.26.0" meilisearch-sdk = { version = "0.29.1", default-features = false } diff --git a/apps/labrinth/.env.local b/apps/labrinth/.env.local index fa70155e..0dc3fbc5 100644 --- a/apps/labrinth/.env.local +++ b/apps/labrinth/.env.local @@ -6,6 +6,7 @@ SITE_URL=http://localhost:3000 # This CDN URL matches the local storage backend set below, which uses MOCK_FILE_PATH CDN_URL=file:///tmp/modrinth LABRINTH_ADMIN_KEY=feedbeef +LABRINTH_EXTERNAL_NOTIFICATION_KEY=beeffeed RATE_LIMIT_IGNORE_KEY=feedbeef DATABASE_URL=postgresql://labrinth:labrinth@localhost/labrinth diff --git a/apps/labrinth/.sqlx/query-006813fc9b61e5333484e7c6443f0325fd64f9ab965fed3f973adeced8719128.json b/apps/labrinth/.sqlx/query-006813fc9b61e5333484e7c6443f0325fd64f9ab965fed3f973adeced8719128.json new file mode 100644 index 00000000..cb92e14c --- /dev/null +++ b/apps/labrinth/.sqlx/query-006813fc9b61e5333484e7c6443f0325fd64f9ab965fed3f973adeced8719128.json @@ -0,0 +1,46 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n COALESCE(unp.id, dnp.id) \"id!\",\n unp.user_id,\n dnp.channel \"channel!\",\n dnp.notification_type \"notification_type!\",\n COALESCE(unp.enabled, dnp.enabled, false) \"enabled!\"\n FROM users_notifications_preferences dnp\n LEFT JOIN users_notifications_preferences unp\n ON unp.channel = dnp.channel\n AND unp.notification_type = dnp.notification_type\n AND unp.user_id = ANY($1::bigint[])\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "user_id", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "channel!", + "type_info": "Varchar" + }, + { + "ordinal": 3, + "name": "notification_type!", + "type_info": "Varchar" + }, + { + "ordinal": 4, + "name": "enabled!", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Int8Array" + ] + }, + "nullable": [ + null, + true, + false, + false, + null + ] + }, + "hash": "006813fc9b61e5333484e7c6443f0325fd64f9ab965fed3f973adeced8719128" +} diff --git a/apps/labrinth/.sqlx/query-0339cb166cfc7e78fc1269d5d1547a772977b269d6d01a64a1f93acb86f9e411.json b/apps/labrinth/.sqlx/query-0339cb166cfc7e78fc1269d5d1547a772977b269d6d01a64a1f93acb86f9e411.json new file mode 100644 index 00000000..9b4a45df --- /dev/null +++ b/apps/labrinth/.sqlx/query-0339cb166cfc7e78fc1269d5d1547a772977b269d6d01a64a1f93acb86f9e411.json @@ -0,0 +1,76 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT n.id, n.user_id, n.name, n.text, n.link, n.created, n.read, n.type notification_type, n.body,\n JSONB_AGG(DISTINCT jsonb_build_object('id', na.id, 'notification_id', na.notification_id, 'name', na.name, 'action_route_method', na.action_route_method, 'action_route', na.action_route)) filter (where na.id is not null) actions\n FROM notifications n\n LEFT OUTER JOIN notifications_actions na on n.id = na.notification_id\n INNER JOIN notifications_types nt on nt.name = n.body ->> 'type'\n WHERE n.user_id = $1\n AND nt.expose_in_site_notifications = TRUE\n GROUP BY n.id, n.user_id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "user_id", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "name", + "type_info": "Varchar" + }, + { + "ordinal": 3, + "name": "text", + "type_info": "Varchar" + }, + { + "ordinal": 4, + "name": "link", + "type_info": "Varchar" + }, + { + "ordinal": 5, + "name": "created", + "type_info": "Timestamptz" + }, + { + "ordinal": 6, + "name": "read", + "type_info": "Bool" + }, + { + "ordinal": 7, + "name": "notification_type", + "type_info": "Varchar" + }, + { + "ordinal": 8, + "name": "body", + "type_info": "Jsonb" + }, + { + "ordinal": 9, + "name": "actions", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false, + false, + true, + true, + true, + false, + false, + true, + true, + null + ] + }, + "hash": "0339cb166cfc7e78fc1269d5d1547a772977b269d6d01a64a1f93acb86f9e411" +} diff --git a/apps/labrinth/.sqlx/query-0c425b9e08bd7a8cefce82adf87cca44340bd51b012ca2fb19a095f1c6038437.json b/apps/labrinth/.sqlx/query-0c425b9e08bd7a8cefce82adf87cca44340bd51b012ca2fb19a095f1c6038437.json new file mode 100644 index 00000000..0f258683 --- /dev/null +++ b/apps/labrinth/.sqlx/query-0c425b9e08bd7a8cefce82adf87cca44340bd51b012ca2fb19a095f1c6038437.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM notifications_deliveries\n WHERE notification_id = ANY($1)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8Array" + ] + }, + "nullable": [] + }, + "hash": "0c425b9e08bd7a8cefce82adf87cca44340bd51b012ca2fb19a095f1c6038437" +} diff --git a/apps/labrinth/.sqlx/query-0e29dad2b228ca4922811bb45f05f39145489302a4e9bc25eeed49c97d3dc01e.json b/apps/labrinth/.sqlx/query-0e29dad2b228ca4922811bb45f05f39145489302a4e9bc25eeed49c97d3dc01e.json new file mode 100644 index 00000000..662538c8 --- /dev/null +++ b/apps/labrinth/.sqlx/query-0e29dad2b228ca4922811bb45f05f39145489302a4e9bc25eeed49c97d3dc01e.json @@ -0,0 +1,35 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n users.username \"user_name!\",\n users.email \"user_email\",\n project.name \"project_name!\"\n FROM users\n INNER JOIN mods project ON project.id = $1\n WHERE users.id = $2\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "user_name!", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "user_email", + "type_info": "Varchar" + }, + { + "ordinal": 2, + "name": "project_name!", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [ + "Int8", + "Int8" + ] + }, + "nullable": [ + false, + true, + false + ] + }, + "hash": "0e29dad2b228ca4922811bb45f05f39145489302a4e9bc25eeed49c97d3dc01e" +} diff --git a/apps/labrinth/.sqlx/query-5f7ce5881b9051f2a2e88577f8851a8e367c8914fa40ff2224dcb907284339d8.json b/apps/labrinth/.sqlx/query-5f7ce5881b9051f2a2e88577f8851a8e367c8914fa40ff2224dcb907284339d8.json new file mode 100644 index 00000000..e99410a8 --- /dev/null +++ b/apps/labrinth/.sqlx/query-5f7ce5881b9051f2a2e88577f8851a8e367c8914fa40ff2224dcb907284339d8.json @@ -0,0 +1,66 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n id, notification_id, user_id, channel, delivery_priority, status, next_attempt, attempt_count\n FROM notifications_deliveries\n WHERE\n status = $3\n AND channel = $1\n AND next_attempt <= NOW()\n ORDER BY\n delivery_priority DESC,\n next_attempt ASC\n LIMIT $2\n FOR UPDATE\n SKIP LOCKED\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "notification_id", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "user_id", + "type_info": "Int8" + }, + { + "ordinal": 3, + "name": "channel", + "type_info": "Varchar" + }, + { + "ordinal": 4, + "name": "delivery_priority", + "type_info": "Int4" + }, + { + "ordinal": 5, + "name": "status", + "type_info": "Varchar" + }, + { + "ordinal": 6, + "name": "next_attempt", + "type_info": "Timestamptz" + }, + { + "ordinal": 7, + "name": "attempt_count", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text", + "Int8", + "Text" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "5f7ce5881b9051f2a2e88577f8851a8e367c8914fa40ff2224dcb907284339d8" +} diff --git a/apps/labrinth/.sqlx/query-66f890fcf2761869e5580c82ea5054c8e5ce839fb4a6c2d94b9621b57cb0e02c.json b/apps/labrinth/.sqlx/query-66f890fcf2761869e5580c82ea5054c8e5ce839fb4a6c2d94b9621b57cb0e02c.json new file mode 100644 index 00000000..244422f6 --- /dev/null +++ b/apps/labrinth/.sqlx/query-66f890fcf2761869e5580c82ea5054c8e5ce839fb4a6c2d94b9621b57cb0e02c.json @@ -0,0 +1,25 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO users_notifications_preferences (\n user_id, channel, notification_type, enabled\n )\n VALUES ($1, $2, $3, $4)\n RETURNING id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8", + "Varchar", + "Varchar", + "Bool" + ] + }, + "nullable": [ + false + ] + }, + "hash": "66f890fcf2761869e5580c82ea5054c8e5ce839fb4a6c2d94b9621b57cb0e02c" +} diff --git a/apps/labrinth/.sqlx/query-8399e818bbe8642304b2e30dcac511f8242cb66d6daedfdcd9627462dc08b2f1.json b/apps/labrinth/.sqlx/query-8399e818bbe8642304b2e30dcac511f8242cb66d6daedfdcd9627462dc08b2f1.json new file mode 100644 index 00000000..126e94c0 --- /dev/null +++ b/apps/labrinth/.sqlx/query-8399e818bbe8642304b2e30dcac511f8242cb66d6daedfdcd9627462dc08b2f1.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH\n channels AS (\n SELECT channel FROM UNNEST($1::varchar[]) AS t(channel)\n ),\n delivery_candidates AS (\n SELECT\n ids.notification_id,\n ids.user_id,\n channels.channel,\n nt.delivery_priority,\n uprefs.enabled user_enabled,\n dprefs.enabled default_enabled\n FROM\n UNNEST(\n $2::bigint[],\n $3::bigint[],\n $4::varchar[]\n ) AS ids(notification_id, user_id, notification_type)\n CROSS JOIN channels\n INNER JOIN\n notifications_types nt ON nt.name = ids.notification_type\n LEFT JOIN users_notifications_preferences uprefs\n ON uprefs.user_id = ids.user_id\n AND uprefs.channel = channels.channel\n AND uprefs.notification_type = ids.notification_type\n LEFT JOIN users_notifications_preferences dprefs\n ON dprefs.user_id IS NULL\n AND dprefs.channel = channels.channel\n AND dprefs.notification_type = ids.notification_type\n )\n INSERT INTO notifications_deliveries\n (notification_id, user_id, channel, delivery_priority, status, next_attempt, attempt_count)\n SELECT\n dc.notification_id,\n dc.user_id,\n dc.channel,\n dc.delivery_priority,\n CASE\n -- User explicitly enabled\n WHEN user_enabled = TRUE THEN $5\n\n -- Is enabled by default, no preference by user\n WHEN user_enabled IS NULL AND default_enabled = TRUE THEN $5\n\n -- User explicitly disabled (regardless of default)\n WHEN user_enabled = FALSE THEN $6\n\n -- User set no preference, default disabled\n WHEN user_enabled IS NULL AND default_enabled = FALSE THEN $7\n\n -- At this point, user set no preference and there is no\n -- default set, so treat as disabled-by-default.\n ELSE $7\n END status,\n NOW() next_attempt,\n 0 attempt_count\n FROM\n delivery_candidates dc\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "VarcharArray", + "Int8Array", + "Int8Array", + "VarcharArray", + "Text", + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "8399e818bbe8642304b2e30dcac511f8242cb66d6daedfdcd9627462dc08b2f1" +} diff --git a/apps/labrinth/.sqlx/query-91e4b5a08579246e2eca91c1c38f0e8ff3d11077e172f103b65044aab2f90a91.json b/apps/labrinth/.sqlx/query-91e4b5a08579246e2eca91c1c38f0e8ff3d11077e172f103b65044aab2f90a91.json new file mode 100644 index 00000000..3d482060 --- /dev/null +++ b/apps/labrinth/.sqlx/query-91e4b5a08579246e2eca91c1c38f0e8ff3d11077e172f103b65044aab2f90a91.json @@ -0,0 +1,42 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n users.username \"user_name!\",\n users.email \"user_email\",\n inviter.username \"inviter_name!\",\n project.name \"project_name!\"\n FROM users\n INNER JOIN users inviter ON inviter.id = $1\n INNER JOIN mods project ON project.id = $2\n WHERE users.id = $3\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "user_name!", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "user_email", + "type_info": "Varchar" + }, + { + "ordinal": 2, + "name": "inviter_name!", + "type_info": "Varchar" + }, + { + "ordinal": 3, + "name": "project_name!", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [ + "Int8", + "Int8", + "Int8" + ] + }, + "nullable": [ + false, + true, + false, + false + ] + }, + "hash": "91e4b5a08579246e2eca91c1c38f0e8ff3d11077e172f103b65044aab2f90a91" +} diff --git a/apps/labrinth/.sqlx/query-971bbd54f168da93b39b8550776157ff82a679798ea198e52091c75d31bc5e7c.json b/apps/labrinth/.sqlx/query-971bbd54f168da93b39b8550776157ff82a679798ea198e52091c75d31bc5e7c.json new file mode 100644 index 00000000..58cb6757 --- /dev/null +++ b/apps/labrinth/.sqlx/query-971bbd54f168da93b39b8550776157ff82a679798ea198e52091c75d31bc5e7c.json @@ -0,0 +1,42 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n users.username \"user_name!\",\n users.email \"user_email\",\n inviter.username \"inviter_name!\",\n organization.name \"organization_name!\"\n FROM users\n INNER JOIN users inviter ON inviter.id = $1\n INNER JOIN organizations organization ON organization.id = $2\n WHERE users.id = $3\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "user_name!", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "user_email", + "type_info": "Varchar" + }, + { + "ordinal": 2, + "name": "inviter_name!", + "type_info": "Varchar" + }, + { + "ordinal": 3, + "name": "organization_name!", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Int8", + "Int8", + "Int8" + ] + }, + "nullable": [ + false, + true, + false, + false + ] + }, + "hash": "971bbd54f168da93b39b8550776157ff82a679798ea198e52091c75d31bc5e7c" +} diff --git a/apps/labrinth/.sqlx/query-a04c04cfb025e36dddd78638fd042792dbf6a1d83a15d0d08b5ce589063eefd4.json b/apps/labrinth/.sqlx/query-a04c04cfb025e36dddd78638fd042792dbf6a1d83a15d0d08b5ce589063eefd4.json new file mode 100644 index 00000000..cd2a80dd --- /dev/null +++ b/apps/labrinth/.sqlx/query-a04c04cfb025e36dddd78638fd042792dbf6a1d83a15d0d08b5ce589063eefd4.json @@ -0,0 +1,64 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n id, notification_id, user_id, channel, delivery_priority, status, next_attempt, attempt_count\n FROM notifications_deliveries\n WHERE user_id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "notification_id", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "user_id", + "type_info": "Int8" + }, + { + "ordinal": 3, + "name": "channel", + "type_info": "Varchar" + }, + { + "ordinal": 4, + "name": "delivery_priority", + "type_info": "Int4" + }, + { + "ordinal": 5, + "name": "status", + "type_info": "Varchar" + }, + { + "ordinal": 6, + "name": "next_attempt", + "type_info": "Timestamptz" + }, + { + "ordinal": 7, + "name": "attempt_count", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "a04c04cfb025e36dddd78638fd042792dbf6a1d83a15d0d08b5ce589063eefd4" +} diff --git a/apps/labrinth/.sqlx/query-a92900cba0e27410d29910c991b9a161ef58e39455454e5b3a380ed62eb15eb2.json b/apps/labrinth/.sqlx/query-a92900cba0e27410d29910c991b9a161ef58e39455454e5b3a380ed62eb15eb2.json new file mode 100644 index 00000000..6b10485c --- /dev/null +++ b/apps/labrinth/.sqlx/query-a92900cba0e27410d29910c991b9a161ef58e39455454e5b3a380ed62eb15eb2.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE notifications_deliveries\n SET\n delivery_priority = $2,\n status = $3,\n next_attempt = $4,\n attempt_count = $5\n WHERE id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Int4", + "Varchar", + "Timestamptz", + "Int4" + ] + }, + "nullable": [] + }, + "hash": "a92900cba0e27410d29910c991b9a161ef58e39455454e5b3a380ed62eb15eb2" +} diff --git a/apps/labrinth/.sqlx/query-b3371c0ff555f8f90ced4c4b1f397863e65d9aafe06f77703db18b492e6a9c03.json b/apps/labrinth/.sqlx/query-b3371c0ff555f8f90ced4c4b1f397863e65d9aafe06f77703db18b492e6a9c03.json new file mode 100644 index 00000000..3f589630 --- /dev/null +++ b/apps/labrinth/.sqlx/query-b3371c0ff555f8f90ced4c4b1f397863e65d9aafe06f77703db18b492e6a9c03.json @@ -0,0 +1,52 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT * FROM notifications_templates WHERE channel = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "channel", + "type_info": "Varchar" + }, + { + "ordinal": 2, + "name": "notification_type", + "type_info": "Varchar" + }, + { + "ordinal": 3, + "name": "subject_line", + "type_info": "Text" + }, + { + "ordinal": 4, + "name": "body_fetch_url", + "type_info": "Text" + }, + { + "ordinal": 5, + "name": "plaintext_fallback", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false + ] + }, + "hash": "b3371c0ff555f8f90ced4c4b1f397863e65d9aafe06f77703db18b492e6a9c03" +} diff --git a/apps/labrinth/.sqlx/query-dc05295852b5a1d49be7906cd248566ffdfe790d7b61bd69969b00d558b41804.json b/apps/labrinth/.sqlx/query-bc21f3bef3585780f445725576ca6a1a9e89a896a8e8cfaae46137d22d40a837.json similarity index 92% rename from apps/labrinth/.sqlx/query-dc05295852b5a1d49be7906cd248566ffdfe790d7b61bd69969b00d558b41804.json rename to apps/labrinth/.sqlx/query-bc21f3bef3585780f445725576ca6a1a9e89a896a8e8cfaae46137d22d40a837.json index a6e27474..4850efeb 100644 --- a/apps/labrinth/.sqlx/query-dc05295852b5a1d49be7906cd248566ffdfe790d7b61bd69969b00d558b41804.json +++ b/apps/labrinth/.sqlx/query-bc21f3bef3585780f445725576ca6a1a9e89a896a8e8cfaae46137d22d40a837.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT n.id, n.user_id, n.name, n.text, n.link, n.created, n.read, n.type notification_type, n.body,\n JSONB_AGG(DISTINCT jsonb_build_object('id', na.id, 'notification_id', na.notification_id, 'name', na.name, 'action_route_method', na.action_route_method, 'action_route', na.action_route)) filter (where na.id is not null) actions\n FROM notifications n\n LEFT OUTER JOIN notifications_actions na on n.id = na.notification_id\n WHERE n.user_id = $1\n GROUP BY n.id, n.user_id;\n ", + "query": "\n SELECT n.id, n.user_id, n.name, n.text, n.link, n.created, n.read, n.type notification_type, n.body,\n JSONB_AGG(DISTINCT jsonb_build_object('id', na.id, 'notification_id', na.notification_id, 'name', na.name, 'action_route_method', na.action_route_method, 'action_route', na.action_route)) filter (where na.id is not null) actions\n FROM notifications n\n LEFT OUTER JOIN notifications_actions na on n.id = na.notification_id\n WHERE n.user_id = $1\n GROUP BY n.id, n.user_id\n ", "describe": { "columns": [ { @@ -72,5 +72,5 @@ null ] }, - "hash": "dc05295852b5a1d49be7906cd248566ffdfe790d7b61bd69969b00d558b41804" + "hash": "bc21f3bef3585780f445725576ca6a1a9e89a896a8e8cfaae46137d22d40a837" } diff --git a/apps/labrinth/.sqlx/query-c8ae8b814a1877a5fd3919a87ad41ed4ac11e74f3640594939fd964ee7bf75c0.json b/apps/labrinth/.sqlx/query-c8ae8b814a1877a5fd3919a87ad41ed4ac11e74f3640594939fd964ee7bf75c0.json new file mode 100644 index 00000000..a2b45531 --- /dev/null +++ b/apps/labrinth/.sqlx/query-c8ae8b814a1877a5fd3919a87ad41ed4ac11e74f3640594939fd964ee7bf75c0.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO notifications_deliveries (\n notification_id, user_id, channel, delivery_priority, status, next_attempt, attempt_count\n )\n VALUES ($1, $2, $3, $4, $5, $6, $7)\n RETURNING id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8", + "Int8", + "Varchar", + "Int4", + "Varchar", + "Timestamptz", + "Int4" + ] + }, + "nullable": [ + false + ] + }, + "hash": "c8ae8b814a1877a5fd3919a87ad41ed4ac11e74f3640594939fd964ee7bf75c0" +} diff --git a/apps/labrinth/.sqlx/query-f39c5338f0776255c35d13c98e4d4e10bb9a871d420a3315aa8617bb2aa0d679.json b/apps/labrinth/.sqlx/query-f39c5338f0776255c35d13c98e4d4e10bb9a871d420a3315aa8617bb2aa0d679.json new file mode 100644 index 00000000..dce17e61 --- /dev/null +++ b/apps/labrinth/.sqlx/query-f39c5338f0776255c35d13c98e4d4e10bb9a871d420a3315aa8617bb2aa0d679.json @@ -0,0 +1,38 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT * FROM notifications_types", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "name", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "delivery_priority", + "type_info": "Int4" + }, + { + "ordinal": 2, + "name": "expose_in_user_preferences", + "type_info": "Bool" + }, + { + "ordinal": 3, + "name": "expose_in_site_notifications", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + false, + false + ] + }, + "hash": "f39c5338f0776255c35d13c98e4d4e10bb9a871d420a3315aa8617bb2aa0d679" +} diff --git a/apps/labrinth/.sqlx/query-fbd89475ed4a963bfced02d56aec048c797855bbd1e57c18d1f0a5392493c9ec.json b/apps/labrinth/.sqlx/query-fbd89475ed4a963bfced02d56aec048c797855bbd1e57c18d1f0a5392493c9ec.json new file mode 100644 index 00000000..4a0a5017 --- /dev/null +++ b/apps/labrinth/.sqlx/query-fbd89475ed4a963bfced02d56aec048c797855bbd1e57c18d1f0a5392493c9ec.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT COUNT(*) \"count!\" FROM users WHERE id = ANY($1)", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8Array" + ] + }, + "nullable": [ + null + ] + }, + "hash": "fbd89475ed4a963bfced02d56aec048c797855bbd1e57c18d1f0a5392493c9ec" +} diff --git a/apps/labrinth/Cargo.toml b/apps/labrinth/Cargo.toml index c0dc09a9..fbc7c72c 100644 --- a/apps/labrinth/Cargo.toml +++ b/apps/labrinth/Cargo.toml @@ -35,7 +35,12 @@ paste.workspace = true meilisearch-sdk = { workspace = true, features = ["reqwest"] } rust-s3.workspace = true -reqwest = { workspace = true, features = ["http2", "rustls-tls-webpki-roots", "json", "multipart"] } +reqwest = { workspace = true, features = [ + "http2", + "rustls-tls-webpki-roots", + "json", + "multipart", +] } hyper-rustls.workspace = true hyper-util.workspace = true @@ -85,7 +90,10 @@ sqlx = { workspace = true, features = [ "rust_decimal", "json", ] } -rust_decimal = { workspace = true, features = ["serde-with-float", "serde-with-str"] } +rust_decimal = { workspace = true, features = [ + "serde-with-float", + "serde-with-str", +] } redis = { workspace = true, features = ["tokio-comp", "ahash", "r2d2"] } deadpool-redis.workspace = true clickhouse = { workspace = true, features = ["uuid", "time"] } @@ -124,7 +132,12 @@ lettre.workspace = true rust_iso3166.workspace = true -async-stripe = { workspace = true, features = ["billing", "checkout", "connect", "webhook-events"] } +async-stripe = { workspace = true, features = [ + "billing", + "checkout", + "connect", + "webhook-events", +] } rusty-money.workspace = true json-patch.workspace = true diff --git a/apps/labrinth/migrations/20250902133943_notification-extension.sql b/apps/labrinth/migrations/20250902133943_notification-extension.sql new file mode 100644 index 00000000..c5dec787 --- /dev/null +++ b/apps/labrinth/migrations/20250902133943_notification-extension.sql @@ -0,0 +1,255 @@ +CREATE TABLE notifications_deliveries ( + id BIGSERIAL PRIMARY KEY, + notification_id BIGINT NOT NULL REFERENCES notifications(id), + channel VARCHAR(32) NOT NULL, + user_id BIGINT NOT NULL REFERENCES users(id), + delivery_priority INTEGER NOT NULL, + status VARCHAR(32) NOT NULL, + next_attempt timestamptz NOT NULL, + attempt_count INTEGER NOT NULL, + + UNIQUE (notification_id, channel) +); + +CREATE INDEX idx_notifications_deliveries_composite_queue +ON notifications_deliveries(channel, status, next_attempt ASC, delivery_priority DESC) +INCLUDE (notification_id, user_id); + +CREATE INDEX idx_notifications_deliveries_user_id +ON notifications_deliveries(user_id); + +CREATE TABLE users_notifications_preferences ( + id BIGSERIAL PRIMARY KEY, + user_id BIGINT REFERENCES users(id), + channel VARCHAR(32) NOT NULL, + notification_type VARCHAR(32) NOT NULL, + enabled BOOL NOT NULL +); + +CREATE INDEX idx_users_notifications_preferences_user_id +ON users_notifications_preferences(user_id); + +CREATE UNIQUE INDEX idx_users_notifications_preferences_partial_contextual_uniq +ON users_notifications_preferences(COALESCE(user_id, -1), channel, notification_type); + +CREATE TABLE notifications_types ( + name VARCHAR(32) PRIMARY KEY, + delivery_priority INTEGER NOT NULL, + expose_in_user_preferences BOOL NOT NULL, + expose_in_site_notifications BOOL NOT NULL +); + +CREATE TABLE notifications_templates ( + id BIGSERIAL PRIMARY KEY, + channel VARCHAR(32) NOT NULL, + notification_type VARCHAR(32) NOT NULL REFERENCES notifications_types(name), + subject_line TEXT NOT NULL, + body_fetch_url TEXT NOT NULL, + plaintext_fallback TEXT NOT NULL +); + +INSERT INTO notifications_types (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) VALUES ('reset_password', 3, FALSE, FALSE); +INSERT INTO notifications_types (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) VALUES ('project_update', 1, TRUE, TRUE); +INSERT INTO notifications_types (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) VALUES ('team_invite', 1, TRUE, TRUE); +INSERT INTO notifications_types (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) VALUES ('organization_invite', 1, TRUE, TRUE); +INSERT INTO notifications_types (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) VALUES ('status_change', 1, TRUE, TRUE); +INSERT INTO notifications_types (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) VALUES ('moderator_message', 1, TRUE, TRUE); +INSERT INTO notifications_types (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) VALUES ('legacy_markdown', 1, FALSE, TRUE); +INSERT INTO notifications_types (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) VALUES ('unknown', 1, FALSE, TRUE); +INSERT INTO notifications_types (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) VALUES ('verify_email', 3, FALSE, FALSE); +INSERT INTO notifications_types (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) VALUES ('auth_provider_added', 2, FALSE, FALSE); +INSERT INTO notifications_types (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) VALUES ('auth_provider_removed', 2, FALSE, FALSE); +INSERT INTO notifications_types (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) VALUES ('two_factor_enabled', 2, FALSE, FALSE); +INSERT INTO notifications_types (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) VALUES ('two_factor_removed', 2, FALSE, FALSE); +INSERT INTO notifications_types (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) VALUES ('password_changed', 2, FALSE, FALSE); +INSERT INTO notifications_types (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) VALUES ('password_removed', 2, FALSE, FALSE); +INSERT INTO notifications_types (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) VALUES ('email_changed', 2, FALSE, FALSE); +INSERT INTO notifications_types (name, delivery_priority, expose_in_user_preferences, expose_in_site_notifications) VALUES ('payment_failed', 2, FALSE, FALSE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES (NULL, 'email', 'reset_password', TRUE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES (NULL, 'email', 'project_update', FALSE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES (NULL, 'email', 'team_invite', FALSE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES (NULL, 'email', 'organization_invite', FALSE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES (NULL, 'email', 'status_change', FALSE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES (NULL, 'email', 'moderator_message', FALSE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES (NULL, 'email', 'legacy_markdown', FALSE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES (NULL, 'email', 'unknown', FALSE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES (NULL, 'email', 'verify_email', TRUE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES (NULL, 'email', 'auth_provider_added', TRUE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES (NULL, 'email', 'auth_provider_removed', TRUE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES (NULL, 'email', 'two_factor_enabled', TRUE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES (NULL, 'email', 'two_factor_removed', TRUE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES (NULL, 'email', 'password_changed', TRUE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES (NULL, 'email', 'password_removed', TRUE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES (NULL, 'email', 'email_changed', TRUE); + +INSERT INTO users_notifications_preferences (user_id, channel, notification_type, enabled) +VALUES (NULL, 'email', 'payment_failed', TRUE); + +INSERT INTO notifications_templates (channel, notification_type, subject_line, body_fetch_url, plaintext_fallback) +VALUES ( + 'email', 'reset_password', 'Reset your Modrinth password', 'https://modrinth.com/email/reset-password', + CONCAT( + 'Hi {user.name},', + CHR(10), + CHR(10), + 'Please visit the link below to reset your password. If you did not request for your password to be reset, you can safely ignore this email.', + CHR(10), + 'Reset your password: {resetpassword.url}' + ) +); + +INSERT INTO notifications_templates (channel, notification_type, subject_line, body_fetch_url, plaintext_fallback) +VALUES ( + 'email', 'verify_email', 'Verify your Modrinth email', 'https://modrinth.com/email/verify-email', + CONCAT( + 'Hi {user.name},', + CHR(10), + CHR(10), + 'Please visit the link below to verify your Modrinth email. If the button does not work, you can copy the link and paste it into your browser. This link expires in 24 hours.', + CHR(10), + 'Verify your email: {verifyemail.url}' + ) +); + +INSERT INTO notifications_templates (channel, notification_type, subject_line, body_fetch_url, plaintext_fallback) +VALUES ( + 'email', 'auth_provider_added', 'Authentication method added', 'https://modrinth.com/email/auth-provider-added', + CONCAT( + 'Hi {user.name},', + CHR(10), + CHR(10), + 'When logging into Modrinth, you can now log in using the ', '{authprovider.name}', ' authentication provider.', + CHR(10), + 'If you did not make this change, please contact us immediately by replying to this email or through our support portal at https://support.modrinth.com (using', + 'the green chat bubble at the bottom of the page)' + ) +); + +INSERT INTO notifications_templates (channel, notification_type, subject_line, body_fetch_url, plaintext_fallback) +VALUES ( + 'email', 'auth_provider_removed', 'Authentication method removed', 'https://modrinth.com/email/auth-provider-removed', + CONCAT( + 'Hi {user.name},', + CHR(10), + CHR(10), + 'When logging into Modrinth, you can no longer log in using the ', '{authprovider.name}', ' authentication provider.', + CHR(10), + 'If you did not make this change, please contact us immediately by replying to this email or through our support portal at https://support.modrinth.com (using', + 'the green chat bubble at the bottom of the page)' + ) +); + +INSERT INTO notifications_templates (channel, notification_type, subject_line, body_fetch_url, plaintext_fallback) +VALUES ( + 'email', 'two_factor_enabled', 'Two-factor authentication enabled', 'https://modrinth.com/email/two-factor-enabled', + CONCAT( + 'Hi {user.name},', + CHR(10), + CHR(10), + 'When logging into Modrinth, you can now enter a code generated by your authenticator app in addition to entering your usual email address and password.', + CHR(10), + 'If you did not make this change, please contact us immediately by replying to this email or through our support portal at https://support.modrinth.com (using', + 'the green chat bubble at the bottom of the page)' + ) +); + +INSERT INTO notifications_templates (channel, notification_type, subject_line, body_fetch_url, plaintext_fallback) +VALUES ( + 'email', 'two_factor_removed', 'Two-factor authentication removed', 'https://modrinth.com/email/two-factor-removed', + CONCAT( + 'Hi {user.name},', + CHR(10), + CHR(10), + 'When logging into Modrinth, you no longer need two-factor authentication to gain access.', + CHR(10), + 'If you did not make this change, please contact us immediately by replying to this email or through our support portal at https://support.modrinth.com (using', + 'the green chat bubble at the bottom of the page)' + ) +); + +INSERT INTO notifications_templates (channel, notification_type, subject_line, body_fetch_url, plaintext_fallback) +VALUES ( + 'email', 'password_changed', 'Your Modrinth password was changed', 'https://modrinth.com/email/password-changed', + CONCAT( + 'Hi {user.name},', + CHR(10), + CHR(10), + 'Your password has been changed on your account.', + CHR(10), + 'If you did not make this change, please contact us immediately by replying to this email or through our support portal at https://support.modrinth.com (using', + 'the green chat bubble at the bottom of the page)' + ) +); + +INSERT INTO notifications_templates (channel, notification_type, subject_line, body_fetch_url, plaintext_fallback) +VALUES ( + 'email', 'password_removed', 'Your Modrinth password was removed', 'https://modrinth.com/email/password-removed', + CONCAT( + 'Hi {user.name},', + CHR(10), + CHR(10), + 'Your password has been removed on your account.', + CHR(10), + 'If you did not make this change, please contact us immediately by replying to this email or through our support portal at https://support.modrinth.com (using', + 'the green chat bubble at the bottom of the page)' + ) +); + +INSERT INTO notifications_templates (channel, notification_type, subject_line, body_fetch_url, plaintext_fallback) +VALUES ( + 'email', 'email_changed', 'Your Modrinth email was changed', 'https://modrinth.com/email/email-changed', + CONCAT( + 'Hi {user.name},', + CHR(10), + CHR(10), + 'Your Modrinth account email has been updated to {emailchanged.new_email}.', + CHR(10), + 'If you did not make this change, please contact us immediately by replying to this email or through our support portal at https://support.modrinth.com (using', + 'the green chat bubble at the bottom of the page)' + ) +); + +INSERT INTO notifications_templates (channel, notification_type, subject_line, body_fetch_url, plaintext_fallback) +VALUES ( + 'email', 'payment_failed', 'Payment Failed for Modrinth', 'https://modrinth.com/email/payment-failed', + CONCAT( + 'Hi {user.name},', + CHR(10), + CHR(10), + 'Our attempt to collect payment for {paymentfailed.amount} from the payment card on file was unsuccessful. Please update your billing settings to avoid service termination.', + CHR(10), + 'Update billing settings: {billing.url}' + ) +); \ No newline at end of file diff --git a/apps/labrinth/src/auth/email/auth_notif.html b/apps/labrinth/src/auth/email/auth_notif.html deleted file mode 100644 index d15bc61f..00000000 --- a/apps/labrinth/src/auth/email/auth_notif.html +++ /dev/null @@ -1,1635 +0,0 @@ - - - - {{ email_title }} - - - - - - - - - - - - - - - - - - - - - -
- - {{ email_description }}͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏  ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏  ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏  ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏  ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏  ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏  ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏  ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏  ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏  ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏  -
-
- -
- - - - - - -
- -
- - - - - - - - - -
- - - - - - -
- -
-
- - - - - - -
- -
-
-
- -
-
- -
- - - - - - -
- -
- - - - - - - - - -
-
-

- {{ email_title }} -

-
-
-
-

- {{ line_one }} -

-

-   -

-

- {{ line_two }} -

-
-
-
- -
-
- -
- - - - - - -
- -
- -
- - - - - - - - - - - - - - - -
- - - - - - -
- - modrinth logo -
-
- - - - - - -
- - modrinth logo -
-
-
-

- Rinth, Inc. -

-
-
-
-

- 410 N Scottsdale Road -

-

- Suite 1000 -

-

- Tempe, AZ 85281 -

-
-
-
- -
- - - - - - -
- - -
-
-
- -
- - - - - - -
- - - - - - - -
- - - - - - -
- - Discord -
-
- - - - - - - -
- - - - - - -
- - Twitter -
-
- - - - - - - -
- - - - - - -
- - Mastodon -
-
- - - - - - - -
- - - - - - -
- - GitHub -
-
- - - - - - - -
- - - - - - -
- - YouTube -
-
- - - - - - - -
- - - - - - - -
- - - - - - - -
- - - - - - - -
- - - - - - - -
- -
-
- -
- -
-
- -
- - diff --git a/apps/labrinth/src/auth/email/button_notif.html b/apps/labrinth/src/auth/email/button_notif.html deleted file mode 100644 index 26ed32a5..00000000 --- a/apps/labrinth/src/auth/email/button_notif.html +++ /dev/null @@ -1,1733 +0,0 @@ - - - - {{ email_title }} - - - - - - - - - - - - - - - - - - - - - -
- - {{ email_description }}͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏  ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏  ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏  ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏  ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏  ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏  ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏  ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏  ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏  ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏ ͏  -
-
- -
- - - - - - -
- -
- - - - - - - - - -
- - - - - - -
- -
-
- - - - - - -
- -
-
-
- -
-
- -
- - - - - - -
- -
- - - - - - - - - - - - - - - - - - -
-
-

- {{ email_title }} -

-
-
-
-

- {{ line_one }} -

-

-   -

-

- {{ line_two }} -

-
-
- - - - - - -
- - {{ button_title }} -
-
-
-

- {{ button_link }} -

-
-
-
- -
-
- -
- - - - - - -
- -
- -
- - - - - - - - - - - - - - - -
- - - - - - -
- - modrinth logo -
-
- - - - - - -
- - modrinth logo -
-
-
-

- Rinth, Inc. -

-
-
-
-

- 410 N Scottsdale Road -

-

- Suite 1000 -

-

- Tempe, AZ 85281 -

-
-
-
- -
- - - - - - -
- - -
-
-
- -
- - - - - - -
- - - - - - - -
- - - - - - -
- - Discord -
-
- - - - - - - -
- - - - - - -
- - Twitter -
-
- - - - - - - -
- - - - - - -
- - Mastodon -
-
- - - - - - - -
- - - - - - -
- - GitHub -
-
- - - - - - - -
- - - - - - -
- - YouTube -
-
- - - - - - - -
- - - - - - - -
- - - - - - - -
- - - - - - - -
- - - - - - - -
- -
-
- -
- -
-
- -
- - diff --git a/apps/labrinth/src/auth/email/mod.rs b/apps/labrinth/src/auth/email/mod.rs deleted file mode 100644 index 914a7427..00000000 --- a/apps/labrinth/src/auth/email/mod.rs +++ /dev/null @@ -1,95 +0,0 @@ -use lettre::message::Mailbox; -use lettre::message::header::ContentType; -use lettre::transport::smtp::authentication::Credentials; -use lettre::transport::smtp::client::{Tls, TlsParameters}; -use lettre::{Message, SmtpTransport, Transport}; -use thiserror::Error; -use tracing::warn; - -#[derive(Error, Debug)] -pub enum MailError { - #[error("Environment Error")] - Env(#[from] dotenvy::Error), - #[error("Mail Error: {0}")] - Mail(#[from] lettre::error::Error), - #[error("Address Parse Error: {0}")] - Address(#[from] lettre::address::AddressError), - #[error("SMTP Error: {0}")] - Smtp(#[from] lettre::transport::smtp::Error), -} - -pub fn send_email_raw( - to: String, - subject: String, - body: String, -) -> Result<(), MailError> { - let from_name = dotenvy::var("SMTP_FROM_NAME") - .unwrap_or_else(|_| "Modrinth".to_string()); - let from_address = dotenvy::var("SMTP_FROM_ADDRESS") - .unwrap_or_else(|_| "no-reply@mail.modrinth.com".to_string()); - - let email = Message::builder() - .from(Mailbox::new(Some(from_name), from_address.parse()?)) - .to(to.parse()?) - .subject(subject) - .header(ContentType::TEXT_HTML) - .body(body)?; - - let username = dotenvy::var("SMTP_USERNAME")?; - let password = dotenvy::var("SMTP_PASSWORD")?; - let host = dotenvy::var("SMTP_HOST")?; - let port = dotenvy::var("SMTP_PORT")?.parse::().unwrap_or(465); - let creds = - (!username.is_empty()).then(|| Credentials::new(username, password)); - let tls_setting = match dotenvy::var("SMTP_TLS")?.as_str() { - "none" => Tls::None, - "opportunistic_start_tls" => { - Tls::Opportunistic(TlsParameters::new(host.to_string())?) - } - "requires_start_tls" => { - Tls::Required(TlsParameters::new(host.to_string())?) - } - "tls" => Tls::Wrapper(TlsParameters::new(host.to_string())?), - _ => { - warn!("Unrecognized SMTP TLS setting. Defaulting to TLS."); - Tls::Wrapper(TlsParameters::new(host.to_string())?) - } - }; - - let mut mailer = SmtpTransport::relay(&host)?.port(port).tls(tls_setting); - if let Some(creds) = creds { - mailer = mailer.credentials(creds); - } - - mailer.build().send(&email)?; - - Ok(()) -} - -pub fn send_email( - to: String, - email_title: &str, - email_description: &str, - line_two: &str, - button_info: Option<(&str, &str)>, -) -> Result<(), MailError> { - let mut email = if button_info.is_some() { - include_str!("button_notif.html") - } else { - include_str!("auth_notif.html") - } - .replace("{{ email_title }}", email_title) - .replace("{{ email_description }}", email_description) - .replace("{{ line_one }}", email_description) - .replace("{{ line_two }}", line_two); - - if let Some((button_title, button_link)) = button_info { - email = email - .replace("{{ button_title }}", button_title) - .replace("{{ button_link }}", button_link); - } - - send_email_raw(to, email_title.to_string(), email)?; - - Ok(()) -} diff --git a/apps/labrinth/src/auth/mod.rs b/apps/labrinth/src/auth/mod.rs index 051f3833..2dc9311d 100644 --- a/apps/labrinth/src/auth/mod.rs +++ b/apps/labrinth/src/auth/mod.rs @@ -1,5 +1,4 @@ pub mod checks; -pub mod email; pub mod oauth; pub mod templates; pub mod validate; @@ -8,9 +7,7 @@ pub use checks::{ filter_visible_collections, filter_visible_project_ids, filter_visible_projects, }; -pub use email::send_email; use serde::{Deserialize, Serialize}; -// pub use pat::{generate_pat, PersonalAccessToken}; pub use validate::{check_is_moderator_from_headers, get_user_from_headers}; use crate::file_hosting::FileHostingError; @@ -36,7 +33,7 @@ pub enum AuthenticationError { #[error("Error while decoding PAT: {0}")] Decoding(#[from] ariadne::ids::DecodingError), #[error("{0}")] - Mail(#[from] email::MailError), + Mail(#[from] crate::queue::email::MailError), #[error("Invalid Authentication Credentials")] InvalidCredentials, #[error("Authentication method was not valid")] diff --git a/apps/labrinth/src/background_task.rs b/apps/labrinth/src/background_task.rs index 18e0ae46..c79d3d6e 100644 --- a/apps/labrinth/src/background_task.rs +++ b/apps/labrinth/src/background_task.rs @@ -1,4 +1,5 @@ use crate::database::redis::RedisPool; +use crate::queue::email::EmailQueue; use crate::queue::payouts::{ PayoutsQueue, insert_bank_balances_and_webhook, process_payout, }; @@ -18,6 +19,7 @@ pub enum BackgroundTask { IndexBilling, IndexSubscriptions, Migrations, + Mail, } impl BackgroundTask { @@ -28,6 +30,7 @@ impl BackgroundTask { search_config: search::SearchConfig, clickhouse: clickhouse::Client, stripe_client: stripe::Client, + email_queue: EmailQueue, ) { use BackgroundTask::*; match self { @@ -52,10 +55,19 @@ impl BackgroundTask { ) .await } + Mail => { + run_email(email_queue).await; + } } } } +pub async fn run_email(email_queue: EmailQueue) { + if let Err(error) = email_queue.index().await { + error!(%error, "Failed to index email queue"); + } +} + pub async fn update_bank_balances(pool: sqlx::Pool) { let payouts_queue = PayoutsQueue::new(); diff --git a/apps/labrinth/src/database/models/mod.rs b/apps/labrinth/src/database/models/mod.rs index 25f2ff11..7c5b5e60 100644 --- a/apps/labrinth/src/database/models/mod.rs +++ b/apps/labrinth/src/database/models/mod.rs @@ -10,6 +10,9 @@ pub mod image_item; pub mod legacy_loader_fields; pub mod loader_fields; pub mod notification_item; +pub mod notifications_deliveries_item; +pub mod notifications_template_item; +pub mod notifications_type_item; pub mod oauth_client_authorization_item; pub mod oauth_client_item; pub mod oauth_token_item; @@ -26,6 +29,7 @@ pub mod thread_item; pub mod user_item; pub mod user_subscription_item; pub mod users_compliance; +pub mod users_notifications_preferences_item; pub mod users_redeemals; pub mod version_item; diff --git a/apps/labrinth/src/database/models/notification_item.rs b/apps/labrinth/src/database/models/notification_item.rs index e50f1ada..eb90b4e9 100644 --- a/apps/labrinth/src/database/models/notification_item.rs +++ b/apps/labrinth/src/database/models/notification_item.rs @@ -1,6 +1,8 @@ use super::ids::*; use crate::database::{models::DatabaseError, redis::RedisPool}; -use crate::models::notifications::NotificationBody; +use crate::models::notifications::{ + NotificationBody, NotificationChannel, NotificationDeliveryStatus, +}; use chrono::{DateTime, Utc}; use futures::TryStreamExt; use serde::{Deserialize, Serialize}; @@ -55,6 +57,10 @@ impl NotificationBuilder { .map(|_| body.clone()) .collect::>(); + let users_raw_ids = users.iter().map(|x| x.0).collect::>(); + let notification_ids = + notification_ids.iter().map(|x| x.0).collect::>(); + sqlx::query!( " INSERT INTO notifications ( @@ -62,16 +68,97 @@ impl NotificationBuilder { ) SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::jsonb[]) ", - ¬ification_ids - .into_iter() - .map(|x| x.0) - .collect::>()[..], - &users.iter().map(|x| x.0).collect::>()[..], + ¬ification_ids[..], + &users_raw_ids[..], &bodies[..], ) .execute(&mut **transaction) .await?; + let notification_types = notification_ids + .iter() + .map(|_| self.body.notification_type().as_str()) + .collect::>(); + + let notification_channels = NotificationChannel::list() + .iter() + .map(|x| x.as_str()) + .collect::>(); + + // Insert required rows into `notifications_deliveries` by channel + // and notification type, based on the user's preferences. + let query = sqlx::query!( + r#" + WITH + channels AS ( + SELECT channel FROM UNNEST($1::varchar[]) AS t(channel) + ), + delivery_candidates AS ( + SELECT + ids.notification_id, + ids.user_id, + channels.channel, + nt.delivery_priority, + uprefs.enabled user_enabled, + dprefs.enabled default_enabled + FROM + UNNEST( + $2::bigint[], + $3::bigint[], + $4::varchar[] + ) AS ids(notification_id, user_id, notification_type) + CROSS JOIN channels + INNER JOIN + notifications_types nt ON nt.name = ids.notification_type + LEFT JOIN users_notifications_preferences uprefs + ON uprefs.user_id = ids.user_id + AND uprefs.channel = channels.channel + AND uprefs.notification_type = ids.notification_type + LEFT JOIN users_notifications_preferences dprefs + ON dprefs.user_id IS NULL + AND dprefs.channel = channels.channel + AND dprefs.notification_type = ids.notification_type + ) + INSERT INTO notifications_deliveries + (notification_id, user_id, channel, delivery_priority, status, next_attempt, attempt_count) + SELECT + dc.notification_id, + dc.user_id, + dc.channel, + dc.delivery_priority, + CASE + -- User explicitly enabled + WHEN user_enabled = TRUE THEN $5 + + -- Is enabled by default, no preference by user + WHEN user_enabled IS NULL AND default_enabled = TRUE THEN $5 + + -- User explicitly disabled (regardless of default) + WHEN user_enabled = FALSE THEN $6 + + -- User set no preference, default disabled + WHEN user_enabled IS NULL AND default_enabled = FALSE THEN $7 + + -- At this point, user set no preference and there is no + -- default set, so treat as disabled-by-default. + ELSE $7 + END status, + NOW() next_attempt, + 0 attempt_count + FROM + delivery_candidates dc + "#, + ¬ification_channels[..] as &[&str], + ¬ification_ids[..], + &users_raw_ids[..], + ¬ification_types[..] as &[&str], + NotificationDeliveryStatus::Pending.as_str(), + NotificationDeliveryStatus::SkippedPreferences.as_str(), + NotificationDeliveryStatus::SkippedDefault.as_str(), + ); + + query.execute(&mut **transaction).await?; + DBNotification::clear_user_notifications_cache(&users, redis).await?; Ok(()) @@ -96,7 +183,7 @@ impl DBNotification { exec: E, ) -> Result, sqlx::Error> where - E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy, + E: sqlx::Executor<'a, Database = sqlx::Postgres>, { let notification_ids_parsed: Vec = notification_ids.iter().map(|x| x.0).collect(); @@ -144,7 +231,60 @@ impl DBNotification { .await } - pub async fn get_many_user<'a, E>( + pub async fn get_all_user<'a, E>( + user_id: DBUserId, + exec: E, + ) -> Result, DatabaseError> + where + E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy, + { + let db_notifications = sqlx::query!( + " + SELECT n.id, n.user_id, n.name, n.text, n.link, n.created, n.read, n.type notification_type, n.body, + JSONB_AGG(DISTINCT jsonb_build_object('id', na.id, 'notification_id', na.notification_id, 'name', na.name, 'action_route_method', na.action_route_method, 'action_route', na.action_route)) filter (where na.id is not null) actions + FROM notifications n + LEFT OUTER JOIN notifications_actions na on n.id = na.notification_id + WHERE n.user_id = $1 + GROUP BY n.id, n.user_id + ", + user_id as DBUserId + ) + .fetch(exec) + .map_ok(|row| { + let id = DBNotificationId(row.id); + + DBNotification { + id, + user_id: DBUserId(row.user_id), + read: row.read, + created: row.created, + body: row.body.clone().and_then(|x| serde_json::from_value(x).ok()).unwrap_or_else(|| { + if let Some(name) = row.name { + NotificationBody::LegacyMarkdown { + notification_type: row.notification_type, + name, + text: row.text.unwrap_or_default(), + link: row.link.unwrap_or_default(), + actions: serde_json::from_value( + row.actions.unwrap_or_default(), + ) + .ok() + .unwrap_or_default(), + } + } else { + NotificationBody::Unknown + } + }), + } + }) + .try_collect::>() + .await?; + + Ok(db_notifications) + } + + /// Returns user notifications that are configured to be exposed on the website. + pub async fn get_many_user_exposed_on_site<'a, E>( user_id: DBUserId, exec: E, redis: &RedisPool, @@ -171,8 +311,10 @@ impl DBNotification { JSONB_AGG(DISTINCT jsonb_build_object('id', na.id, 'notification_id', na.notification_id, 'name', na.name, 'action_route_method', na.action_route_method, 'action_route', na.action_route)) filter (where na.id is not null) actions FROM notifications n LEFT OUTER JOIN notifications_actions na on n.id = na.notification_id + INNER JOIN notifications_types nt on nt.name = n.body ->> 'type' WHERE n.user_id = $1 - GROUP BY n.id, n.user_id; + AND nt.expose_in_site_notifications = TRUE + GROUP BY n.id, n.user_id ", user_id as DBUserId ) @@ -274,6 +416,16 @@ impl DBNotification { let notification_ids_parsed: Vec = notification_ids.iter().map(|x| x.0).collect(); + sqlx::query!( + " + DELETE FROM notifications_deliveries + WHERE notification_id = ANY($1) + ", + ¬ification_ids_parsed + ) + .execute(&mut **transaction) + .await?; + sqlx::query!( " DELETE FROM notifications_actions diff --git a/apps/labrinth/src/database/models/notifications_deliveries_item.rs b/apps/labrinth/src/database/models/notifications_deliveries_item.rs new file mode 100644 index 00000000..28b67295 --- /dev/null +++ b/apps/labrinth/src/database/models/notifications_deliveries_item.rs @@ -0,0 +1,162 @@ +use super::ids::*; +use crate::database::models::DatabaseError; +use crate::models::v3::notifications::{ + NotificationChannel, NotificationDeliveryStatus, +}; +use chrono::{DateTime, Utc}; + +pub struct DBNotificationDelivery { + pub id: i64, + pub notification_id: DBNotificationId, + pub user_id: DBUserId, + pub channel: NotificationChannel, + pub delivery_priority: i32, + pub status: NotificationDeliveryStatus, + pub next_attempt: DateTime, + pub attempt_count: i32, +} + +struct NotificationDeliveryQueryResult { + id: i64, + notification_id: i64, + user_id: i64, + channel: String, + delivery_priority: i32, + status: String, + next_attempt: DateTime, + attempt_count: i32, +} + +macro_rules! select_notification_deliveries_with_predicate { + ($predicate:literal $(, $($param0:expr $(, $param:expr)* $(,)?)?)?) => { + sqlx::query_as!( + NotificationDeliveryQueryResult, + r#" + SELECT + id, notification_id, user_id, channel, delivery_priority, status, next_attempt, attempt_count + FROM notifications_deliveries + "# + + $predicate + $($(, $param0 $(, $param)* )?)? + ) + }; +} + +impl From for DBNotificationDelivery { + fn from(r: NotificationDeliveryQueryResult) -> Self { + DBNotificationDelivery { + id: r.id, + notification_id: DBNotificationId(r.notification_id), + user_id: DBUserId(r.user_id), + channel: NotificationChannel::from_str_or_default(&r.channel), + delivery_priority: r.delivery_priority, + status: NotificationDeliveryStatus::from_str_or_default(&r.status), + next_attempt: r.next_attempt, + attempt_count: r.attempt_count, + } + } +} + +impl DBNotificationDelivery { + pub async fn get_all_user( + user_id: DBUserId, + exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>, + ) -> Result, DatabaseError> { + let user_id = user_id.0; + let results = select_notification_deliveries_with_predicate!( + "WHERE user_id = $1", + user_id + ) + .fetch_all(exec) + .await?; + + Ok(results.into_iter().map(|r| r.into()).collect()) + } + + /// Returns deliveries that should be processed next for a given channel using a row-level + /// `UPDATE` lock, barring the provided limit. + pub async fn lock_channel_processable( + channel: NotificationChannel, + limit: i64, + exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>, + ) -> Result, DatabaseError> { + // This follows the `idx_notifications_deliveries_composite_queue` index. + Ok(select_notification_deliveries_with_predicate!( + "WHERE + status = $3 + AND channel = $1 + AND next_attempt <= NOW() + ORDER BY + delivery_priority DESC, + next_attempt ASC + LIMIT $2 + FOR UPDATE + SKIP LOCKED + ", + channel.as_str(), + limit, + NotificationDeliveryStatus::Pending.as_str() + ) + .fetch_all(exec) + .await? + .into_iter() + .map(Into::into) + .collect()) + } + + /// Inserts the row into the table and updates its ID. + pub async fn insert( + &mut self, + exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>, + ) -> Result<(), DatabaseError> { + let id = sqlx::query_scalar!( + " + INSERT INTO notifications_deliveries ( + notification_id, user_id, channel, delivery_priority, status, next_attempt, attempt_count + ) + VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING id + ", + self.notification_id.0, + self.user_id.0, + self.channel.as_str(), + self.delivery_priority, + self.status.as_str(), + self.next_attempt, + self.attempt_count, + ) + .fetch_one(exec) + .await?; + + self.id = id; + + Ok(()) + } + + /// Updates semantically mutable columns of the row. + pub async fn update( + &self, + exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>, + ) -> Result<(), DatabaseError> { + sqlx::query!( + " + UPDATE notifications_deliveries + SET + delivery_priority = $2, + status = $3, + next_attempt = $4, + attempt_count = $5 + WHERE id = $1 + ", + self.id, + self.delivery_priority, + self.status.as_str(), + self.next_attempt, + self.attempt_count, + ) + .execute(exec) + .await?; + + Ok(()) + } +} diff --git a/apps/labrinth/src/database/models/notifications_template_item.rs b/apps/labrinth/src/database/models/notifications_template_item.rs new file mode 100644 index 00000000..563b4658 --- /dev/null +++ b/apps/labrinth/src/database/models/notifications_template_item.rs @@ -0,0 +1,112 @@ +use crate::database::models::DatabaseError; +use crate::database::redis::RedisPool; +use crate::models::v3::notifications::{NotificationChannel, NotificationType}; +use serde::{Deserialize, Serialize}; + +const TEMPLATES_NAMESPACE: &str = "notifications_templates"; +const TEMPLATES_HTML_DATA_NAMESPACE: &str = "notifications_templates_html_data"; +const HTML_DATA_CACHE_EXPIRY: i64 = 60 * 15; // 15 minutes + +#[derive(Clone, Serialize, Deserialize)] +pub struct NotificationTemplate { + pub id: i64, + pub channel: NotificationChannel, + pub notification_type: NotificationType, + pub subject_line: String, + pub body_fetch_url: String, + pub plaintext_fallback: String, +} + +struct NotificationTemplateQueryResult { + id: i64, + channel: String, + notification_type: String, + subject_line: String, + body_fetch_url: String, + plaintext_fallback: String, +} + +impl From for NotificationTemplate { + fn from(r: NotificationTemplateQueryResult) -> Self { + NotificationTemplate { + id: r.id, + channel: NotificationChannel::from_str_or_default(&r.channel), + notification_type: NotificationType::from_str_or_default( + &r.notification_type, + ), + subject_line: r.subject_line, + body_fetch_url: r.body_fetch_url, + plaintext_fallback: r.plaintext_fallback, + } + } +} + +impl NotificationTemplate { + pub async fn list_channel( + channel: NotificationChannel, + exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>, + redis: &RedisPool, + ) -> Result, DatabaseError> { + let mut redis = redis.connect().await?; + + let maybe_cached_templates = redis + .get_deserialized_from_json(TEMPLATES_NAMESPACE, channel.as_str()) + .await?; + + if let Some(cached) = maybe_cached_templates { + return Ok(cached); + } + + let results = sqlx::query_as!( + NotificationTemplateQueryResult, + r#" + SELECT * FROM notifications_templates WHERE channel = $1 + "#, + channel.as_str(), + ) + .fetch_all(exec) + .await?; + + let templates = results.into_iter().map(Into::into).collect(); + + redis + .set_serialized_to_json( + TEMPLATES_NAMESPACE, + channel.as_str(), + &templates, + None, + ) + .await?; + + Ok(templates) + } + + pub async fn get_cached_html_data( + &self, + redis: &RedisPool, + ) -> Result, DatabaseError> { + let mut redis = redis.connect().await?; + redis + .get_deserialized_from_json( + TEMPLATES_HTML_DATA_NAMESPACE, + &self.id.to_string(), + ) + .await + } + + pub async fn set_cached_html_data( + &self, + data: String, + redis: &RedisPool, + ) -> Result<(), DatabaseError> { + let mut redis = redis.connect().await?; + redis + .set_serialized_to_json( + TEMPLATES_HTML_DATA_NAMESPACE, + &self.id.to_string(), + &data, + Some(HTML_DATA_CACHE_EXPIRY), + ) + .await + } +} diff --git a/apps/labrinth/src/database/models/notifications_type_item.rs b/apps/labrinth/src/database/models/notifications_type_item.rs new file mode 100644 index 00000000..4602fc0e --- /dev/null +++ b/apps/labrinth/src/database/models/notifications_type_item.rs @@ -0,0 +1,72 @@ +use crate::database::models::DatabaseError; +use crate::database::redis::RedisPool; +use crate::models::v3::notifications::NotificationType; +use serde::{Deserialize, Serialize}; + +const NOTIFICATION_TYPES_NAMESPACE: &str = "notification_types"; + +#[derive(Serialize, Deserialize)] +pub struct NotificationTypeItem { + pub name: NotificationType, + pub delivery_priority: i32, + pub expose_in_user_preferences: bool, + pub expose_in_site_notifications: bool, +} + +struct NotificationTypeQueryResult { + name: String, + delivery_priority: i32, + expose_in_user_preferences: bool, + expose_in_site_notifications: bool, +} + +impl From for NotificationTypeItem { + fn from(r: NotificationTypeQueryResult) -> Self { + NotificationTypeItem { + name: NotificationType::from_str_or_default(&r.name), + delivery_priority: r.delivery_priority, + expose_in_user_preferences: r.expose_in_user_preferences, + expose_in_site_notifications: r.expose_in_site_notifications, + } + } +} + +impl NotificationTypeItem { + pub async fn list<'a, E>( + exec: E, + redis: &RedisPool, + ) -> Result, DatabaseError> + where + E: sqlx::Executor<'a, Database = sqlx::Postgres>, + { + let mut redis = redis.connect().await?; + + let cached_types = redis + .get_deserialized_from_json(NOTIFICATION_TYPES_NAMESPACE, "all") + .await?; + + if let Some(types) = cached_types { + return Ok(types); + } + + let results = sqlx::query_as!( + NotificationTypeQueryResult, + "SELECT * FROM notifications_types" + ) + .fetch_all(exec) + .await?; + + let types = results.into_iter().map(Into::into).collect(); + + redis + .set_serialized_to_json( + NOTIFICATION_TYPES_NAMESPACE, + "all", + &types, + None, + ) + .await?; + + Ok(types) + } +} diff --git a/apps/labrinth/src/database/models/user_item.rs b/apps/labrinth/src/database/models/user_item.rs index a771ede0..2d203781 100644 --- a/apps/labrinth/src/database/models/user_item.rs +++ b/apps/labrinth/src/database/models/user_item.rs @@ -233,7 +233,7 @@ impl DBUser { exec: E, ) -> Result, sqlx::Error> where - E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy, + E: sqlx::Executor<'a, Database = sqlx::Postgres>, { let user = sqlx::query!( " @@ -254,7 +254,7 @@ impl DBUser { exec: E, ) -> Result, sqlx::Error> where - E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy, + E: sqlx::Executor<'a, Database = sqlx::Postgres>, { let users = sqlx::query!( " @@ -270,13 +270,32 @@ impl DBUser { Ok(users) } + /// Returns `false` if any of the specified user IDs do not exist. + pub async fn exists_many<'a, E>( + user_ids: &[DBUserId], + exec: E, + ) -> Result + where + E: sqlx::Executor<'a, Database = sqlx::Postgres>, + { + let ids = user_ids.iter().map(|x| x.0).collect::>(); + let count = sqlx::query_scalar!( + r#"SELECT COUNT(*) "count!" FROM users WHERE id = ANY($1)"#, + &ids + ) + .fetch_one(exec) + .await?; + + Ok(count as usize == user_ids.len()) + } + pub async fn get_projects<'a, E>( user_id: DBUserId, exec: E, redis: &RedisPool, ) -> Result, DatabaseError> where - E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy, + E: sqlx::Executor<'a, Database = sqlx::Postgres>, { use futures::stream::TryStreamExt; @@ -324,7 +343,7 @@ impl DBUser { exec: E, ) -> Result, sqlx::Error> where - E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy, + E: sqlx::Executor<'a, Database = sqlx::Postgres>, { use futures::stream::TryStreamExt; @@ -349,7 +368,7 @@ impl DBUser { exec: E, ) -> Result, sqlx::Error> where - E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy, + E: sqlx::Executor<'a, Database = sqlx::Postgres>, { use futures::stream::TryStreamExt; @@ -373,7 +392,7 @@ impl DBUser { exec: E, ) -> Result, sqlx::Error> where - E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy, + E: sqlx::Executor<'a, Database = sqlx::Postgres>, { use futures::stream::TryStreamExt; @@ -397,7 +416,7 @@ impl DBUser { exec: E, ) -> Result, sqlx::Error> where - E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy, + E: sqlx::Executor<'a, Database = sqlx::Postgres>, { use futures::stream::TryStreamExt; @@ -421,7 +440,7 @@ impl DBUser { exec: E, ) -> Result, sqlx::Error> where - E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy, + E: sqlx::Executor<'a, Database = sqlx::Postgres>, { use futures::stream::TryStreamExt; diff --git a/apps/labrinth/src/database/models/users_notifications_preferences_item.rs b/apps/labrinth/src/database/models/users_notifications_preferences_item.rs new file mode 100644 index 00000000..c8014c7e --- /dev/null +++ b/apps/labrinth/src/database/models/users_notifications_preferences_item.rs @@ -0,0 +1,111 @@ +use super::ids::*; +use crate::database::models::DatabaseError; +use crate::models::v3::notifications::{NotificationChannel, NotificationType}; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub struct UserNotificationPreference { + pub id: i64, + pub user_id: Option, + pub channel: NotificationChannel, + pub notification_type: NotificationType, + pub enabled: bool, +} + +struct UserNotificationPreferenceQueryResult { + id: i64, + user_id: Option, + channel: String, + notification_type: String, + enabled: bool, +} + +impl From + for UserNotificationPreference +{ + fn from(r: UserNotificationPreferenceQueryResult) -> Self { + UserNotificationPreference { + id: r.id, + user_id: r.user_id.map(DBUserId), + channel: NotificationChannel::from_str_or_default(&r.channel), + notification_type: NotificationType::from_str_or_default( + &r.notification_type, + ), + enabled: r.enabled, + } + } +} + +impl UserNotificationPreference { + pub async fn get_user_or_default( + user_id: DBUserId, + exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>, + ) -> Result, DatabaseError> { + Self::get_many_users_or_default(&[user_id], exec).await + } + + pub async fn get_many_users_or_default( + user_ids: &[DBUserId], + exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>, + ) -> Result, DatabaseError> { + let results = sqlx::query!( + r#" + SELECT + COALESCE(unp.id, dnp.id) "id!", + unp.user_id, + dnp.channel "channel!", + dnp.notification_type "notification_type!", + COALESCE(unp.enabled, dnp.enabled, false) "enabled!" + FROM users_notifications_preferences dnp + LEFT JOIN users_notifications_preferences unp + ON unp.channel = dnp.channel + AND unp.notification_type = dnp.notification_type + AND unp.user_id = ANY($1::bigint[]) + "#, + &user_ids.iter().map(|x| x.0).collect::>(), + ) + .fetch_all(exec) + .await?; + + let preferences = results + .into_iter() + .map(|r| UserNotificationPreference { + id: r.id, + user_id: r.user_id.map(DBUserId), + channel: NotificationChannel::from_str_or_default(&r.channel), + notification_type: NotificationType::from_str_or_default( + &r.notification_type, + ), + enabled: r.enabled, + }) + .collect(); + + Ok(preferences) + } + + /// Inserts the row into the table and updates its ID. + pub async fn insert( + &mut self, + exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>, + ) -> Result<(), DatabaseError> { + let id = sqlx::query_scalar!( + " + INSERT INTO users_notifications_preferences ( + user_id, channel, notification_type, enabled + ) + VALUES ($1, $2, $3, $4) + RETURNING id + ", + self.user_id.map(|x| x.0), + self.channel.as_str(), + self.notification_type.as_str(), + self.enabled, + ) + .fetch_one(exec) + .await?; + + self.id = id; + + Ok(()) + } +} diff --git a/apps/labrinth/src/lib.rs b/apps/labrinth/src/lib.rs index 9bcad3c6..dcdbaf9e 100644 --- a/apps/labrinth/src/lib.rs +++ b/apps/labrinth/src/lib.rs @@ -4,8 +4,8 @@ use std::time::Duration; use actix_web::web; use database::redis::RedisPool; use queue::{ - analytics::AnalyticsQueue, payouts::PayoutsQueue, session::AuthQueue, - socket::ActiveSockets, + analytics::AnalyticsQueue, email::EmailQueue, payouts::PayoutsQueue, + session::AuthQueue, socket::ActiveSockets, }; use sqlx::Postgres; use tracing::{info, warn}; @@ -58,6 +58,7 @@ pub struct LabrinthConfig { pub automated_moderation_queue: web::Data, pub rate_limiter: web::Data, pub stripe_client: stripe::Client, + pub email_queue: web::Data, } #[allow(clippy::too_many_arguments)] @@ -70,6 +71,7 @@ pub fn app_setup( file_host: Arc, maxmind: Arc, stripe_client: stripe::Client, + email_queue: EmailQueue, enable_background_tasks: bool, ) -> LabrinthConfig { info!( @@ -91,7 +93,7 @@ pub fn app_setup( }); } - let mut scheduler = scheduler::Scheduler::new(); + let scheduler = scheduler::Scheduler::new(); let limiter = web::Data::new(AsyncRateLimiter::new( redis_pool.clone(), @@ -283,6 +285,7 @@ pub fn app_setup( automated_moderation_queue, rate_limiter: limiter, stripe_client, + email_queue: web::Data::new(email_queue), } } @@ -309,6 +312,7 @@ pub fn app_config( .app_data(web::Data::new(labrinth_config.search_config.clone())) .app_data(labrinth_config.session_queue.clone()) .app_data(labrinth_config.payouts_queue.clone()) + .app_data(labrinth_config.email_queue.clone()) .app_data(web::Data::new(labrinth_config.ip_salt.clone())) .app_data(web::Data::new(labrinth_config.analytics_queue.clone())) .app_data(web::Data::new(labrinth_config.clickhouse.clone())) @@ -353,6 +357,7 @@ pub fn check_env_vars() -> bool { failed |= check_var::("SITE_URL"); failed |= check_var::("CDN_URL"); failed |= check_var::("LABRINTH_ADMIN_KEY"); + failed |= check_var::("LABRINTH_EXTERNAL_NOTIFICATION_KEY"); failed |= check_var::("RATE_LIMIT_IGNORE_KEY"); failed |= check_var::("DATABASE_URL"); failed |= check_var::("MEILISEARCH_ADDR"); @@ -449,6 +454,8 @@ pub fn check_env_vars() -> bool { failed |= check_var::("SMTP_HOST"); failed |= check_var::("SMTP_PORT"); failed |= check_var::("SMTP_TLS"); + failed |= check_var::("SMTP_FROM_NAME"); + failed |= check_var::("SMTP_FROM_ADDRESS"); failed |= check_var::("SITE_VERIFY_EMAIL_PATH"); failed |= check_var::("SITE_RESET_PASSWORD_PATH"); diff --git a/apps/labrinth/src/main.rs b/apps/labrinth/src/main.rs index c3942b19..8cbe4394 100644 --- a/apps/labrinth/src/main.rs +++ b/apps/labrinth/src/main.rs @@ -2,9 +2,11 @@ use actix_web::middleware::from_fn; use actix_web::{App, HttpServer}; use actix_web_prom::PrometheusMetricsBuilder; use clap::Parser; +use labrinth::app_config; use labrinth::background_task::BackgroundTask; use labrinth::database::redis::RedisPool; use labrinth::file_hosting::{S3BucketConfig, S3Host}; +use labrinth::queue::email::EmailQueue; use labrinth::search; use labrinth::util::env::parse_var; use labrinth::util::ratelimit::rate_limit_middleware; @@ -134,10 +136,20 @@ async fn main() -> std::io::Result<()> { let stripe_client = stripe::Client::new(dotenvy::var("STRIPE_API_KEY").unwrap()); + let email_queue = + EmailQueue::init(pool.clone(), redis_pool.clone()).unwrap(); + if let Some(task) = args.run_background_task { info!("Running task {task:?} and exiting"); - task.run(pool, redis_pool, search_config, clickhouse, stripe_client) - .await; + task.run( + pool, + redis_pool, + search_config, + clickhouse, + stripe_client, + email_queue, + ) + .await; return Ok(()); } @@ -174,12 +186,12 @@ async fn main() -> std::io::Result<()> { file_host.clone(), maxmind_reader.clone(), stripe_client, + email_queue, !args.no_background_tasks, ); info!("Starting Actix HTTP server!"); - // Init App HttpServer::new(move || { App::new() .wrap(TracingLogger::default()) @@ -187,7 +199,7 @@ async fn main() -> std::io::Result<()> { .wrap(from_fn(rate_limit_middleware)) .wrap(actix_web::middleware::Compress::default()) .wrap(sentry_actix::Sentry::new()) - .configure(|cfg| labrinth::app_config(cfg, labrinth_config.clone())) + .configure(|cfg| app_config(cfg, labrinth_config.clone())) }) .bind(dotenvy::var("BIND_ADDR").unwrap())? .run() diff --git a/apps/labrinth/src/models/v2/notifications.rs b/apps/labrinth/src/models/v2/notifications.rs index 59626b0a..16a94137 100644 --- a/apps/labrinth/src/models/v2/notifications.rs +++ b/apps/labrinth/src/models/v2/notifications.rs @@ -72,6 +72,30 @@ pub enum LegacyNotificationBody { link: String, actions: Vec, }, + // In `NotificationBody`, this has the `flow` field, however, don't + // include it here, to be 100% certain we don't end up leaking it + // in site notifications. + ResetPassword, + // Idem as ResetPassword + VerifyEmail, + AuthProviderAdded { + provider: String, + }, + AuthProviderRemoved { + provider: String, + }, + TwoFactorEnabled, + TwoFactorRemoved, + PasswordChanged, + PasswordRemoved, + EmailChanged { + new_email: String, + to_email: String, + }, + PaymentFailed { + amount: String, + service: String, + }, Unknown, } @@ -93,6 +117,36 @@ impl LegacyNotification { NotificationBody::ModeratorMessage { .. } => { Some("moderator_message".to_string()) } + NotificationBody::ResetPassword { .. } => { + Some("reset_password".to_string()) + } + NotificationBody::VerifyEmail { .. } => { + Some("verify_email".to_string()) + } + NotificationBody::AuthProviderAdded { .. } => { + Some("auth_provider_added".to_string()) + } + NotificationBody::AuthProviderRemoved { .. } => { + Some("auth_provider_removed".to_string()) + } + NotificationBody::TwoFactorEnabled => { + Some("two_factor_enabled".to_string()) + } + NotificationBody::TwoFactorRemoved => { + Some("two_factor_removed".to_string()) + } + NotificationBody::PasswordChanged => { + Some("password_changed".to_string()) + } + NotificationBody::PasswordRemoved => { + Some("password_removed".to_string()) + } + NotificationBody::EmailChanged { .. } => { + Some("email_changed".to_string()) + } + NotificationBody::PaymentFailed { .. } => { + Some("payment_failed".to_string()) + } NotificationBody::LegacyMarkdown { notification_type, .. } => notification_type.clone(), @@ -162,6 +216,40 @@ impl LegacyNotification { link, actions, }, + NotificationBody::ResetPassword { .. } => { + LegacyNotificationBody::ResetPassword + } + NotificationBody::VerifyEmail { .. } => { + LegacyNotificationBody::VerifyEmail + } + NotificationBody::AuthProviderAdded { provider } => { + LegacyNotificationBody::AuthProviderAdded { provider } + } + NotificationBody::AuthProviderRemoved { provider } => { + LegacyNotificationBody::AuthProviderRemoved { provider } + } + NotificationBody::TwoFactorEnabled => { + LegacyNotificationBody::TwoFactorEnabled + } + NotificationBody::TwoFactorRemoved => { + LegacyNotificationBody::TwoFactorRemoved + } + NotificationBody::PasswordChanged => { + LegacyNotificationBody::PasswordChanged + } + NotificationBody::PasswordRemoved => { + LegacyNotificationBody::PasswordRemoved + } + NotificationBody::EmailChanged { + new_email, + to_email, + } => LegacyNotificationBody::EmailChanged { + new_email, + to_email, + }, + NotificationBody::PaymentFailed { amount, service } => { + LegacyNotificationBody::PaymentFailed { amount, service } + } NotificationBody::Unknown => LegacyNotificationBody::Unknown, }; diff --git a/apps/labrinth/src/models/v3/billing.rs b/apps/labrinth/src/models/v3/billing.rs index cb9eb0f9..5d561ceb 100644 --- a/apps/labrinth/src/models/v3/billing.rs +++ b/apps/labrinth/src/models/v3/billing.rs @@ -33,6 +33,20 @@ pub enum ProductMetadata { }, } +impl ProductMetadata { + pub fn is_pyro(&self) -> bool { + matches!(self, ProductMetadata::Pyro { .. }) + } + + pub fn is_medal(&self) -> bool { + matches!(self, ProductMetadata::Medal { .. }) + } + + pub fn is_midas(&self) -> bool { + matches!(self, ProductMetadata::Midas) + } +} + #[derive(Serialize, Deserialize)] pub struct ProductPrice { pub id: ProductPriceId, diff --git a/apps/labrinth/src/models/v3/notifications.rs b/apps/labrinth/src/models/v3/notifications.rs index 6cbd275d..045be3cf 100644 --- a/apps/labrinth/src/models/v3/notifications.rs +++ b/apps/labrinth/src/models/v3/notifications.rs @@ -1,11 +1,13 @@ -use super::ids::OrganizationId; +use super::ids::*; use crate::database::models::notification_item::DBNotification; use crate::database::models::notification_item::DBNotificationAction; +use crate::database::models::notifications_deliveries_item::DBNotificationDelivery; use crate::models::ids::{ NotificationId, ProjectId, ReportId, TeamId, ThreadId, ThreadMessageId, VersionId, }; use crate::models::projects::ProjectStatus; +use crate::routes::ApiError; use ariadne::ids::UserId; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; @@ -24,6 +26,76 @@ pub struct Notification { pub actions: Vec, } +#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum NotificationType { + // If adding a notification type, add a variant in `NotificationBody` of the same name! + ProjectUpdate, + TeamInvite, + OrganizationInvite, + StatusChange, + ModeratorMessage, + LegacyMarkdown, + ResetPassword, + VerifyEmail, + AuthProviderAdded, + AuthProviderRemoved, + TwoFactorEnabled, + TwoFactorRemoved, + PasswordChanged, + PasswordRemoved, + EmailChanged, + PaymentFailed, + Unknown, +} + +impl NotificationType { + pub fn as_str(self) -> &'static str { + match self { + NotificationType::ProjectUpdate => "project_update", + NotificationType::TeamInvite => "team_invite", + NotificationType::OrganizationInvite => "organization_invite", + NotificationType::StatusChange => "status_change", + NotificationType::ModeratorMessage => "moderator_message", + NotificationType::LegacyMarkdown => "legacy_markdown", + NotificationType::ResetPassword => "reset_password", + NotificationType::VerifyEmail => "verify_email", + NotificationType::AuthProviderAdded => "auth_provider_added", + NotificationType::AuthProviderRemoved => "auth_provider_removed", + NotificationType::TwoFactorEnabled => "two_factor_enabled", + NotificationType::TwoFactorRemoved => "two_factor_removed", + NotificationType::PasswordChanged => "password_changed", + NotificationType::PasswordRemoved => "password_removed", + NotificationType::EmailChanged => "email_changed", + NotificationType::PaymentFailed => "payment_failed", + NotificationType::Unknown => "unknown", + } + } + + pub fn from_str_or_default(s: &str) -> Self { + match s { + "project_update" => NotificationType::ProjectUpdate, + "team_invite" => NotificationType::TeamInvite, + "organization_invite" => NotificationType::OrganizationInvite, + "status_change" => NotificationType::StatusChange, + "moderator_message" => NotificationType::ModeratorMessage, + "legacy_markdown" => NotificationType::LegacyMarkdown, + "reset_password" => NotificationType::ResetPassword, + "verify_email" => NotificationType::VerifyEmail, + "auth_provider_added" => NotificationType::AuthProviderAdded, + "auth_provider_removed" => NotificationType::AuthProviderRemoved, + "two_factor_enabled" => NotificationType::TwoFactorEnabled, + "two_factor_removed" => NotificationType::TwoFactorRemoved, + "password_changed" => NotificationType::PasswordChanged, + "password_removed" => NotificationType::PasswordRemoved, + "email_changed" => NotificationType::EmailChanged, + "payment_failed" => NotificationType::PaymentFailed, + "unknown" => NotificationType::Unknown, + _ => NotificationType::Unknown, + } + } +} + #[derive(Serialize, Deserialize, Clone)] #[serde(tag = "type", rename_all = "snake_case")] pub enum NotificationBody { @@ -62,9 +134,87 @@ pub enum NotificationBody { link: String, actions: Vec, }, + ResetPassword { + flow: String, + }, + VerifyEmail { + flow: String, + }, + AuthProviderAdded { + provider: String, + }, + AuthProviderRemoved { + provider: String, + }, + TwoFactorEnabled, + TwoFactorRemoved, + PasswordChanged, + PasswordRemoved, + EmailChanged { + new_email: String, + to_email: String, + }, + PaymentFailed { + amount: String, + service: String, + }, Unknown, } +impl NotificationBody { + pub fn notification_type(&self) -> NotificationType { + match &self { + NotificationBody::ProjectUpdate { .. } => { + NotificationType::ProjectUpdate + } + NotificationBody::TeamInvite { .. } => NotificationType::TeamInvite, + NotificationBody::OrganizationInvite { .. } => { + NotificationType::OrganizationInvite + } + NotificationBody::StatusChange { .. } => { + NotificationType::StatusChange + } + NotificationBody::ModeratorMessage { .. } => { + NotificationType::ModeratorMessage + } + NotificationBody::LegacyMarkdown { .. } => { + NotificationType::LegacyMarkdown + } + NotificationBody::ResetPassword { .. } => { + NotificationType::ResetPassword + } + NotificationBody::VerifyEmail { .. } => { + NotificationType::VerifyEmail + } + NotificationBody::AuthProviderAdded { .. } => { + NotificationType::AuthProviderAdded + } + NotificationBody::AuthProviderRemoved { .. } => { + NotificationType::AuthProviderRemoved + } + NotificationBody::TwoFactorEnabled => { + NotificationType::TwoFactorEnabled + } + NotificationBody::TwoFactorRemoved => { + NotificationType::TwoFactorRemoved + } + NotificationBody::PasswordChanged => { + NotificationType::PasswordChanged + } + NotificationBody::PasswordRemoved => { + NotificationType::PasswordRemoved + } + NotificationBody::EmailChanged { .. } => { + NotificationType::EmailChanged + } + NotificationBody::PaymentFailed { .. } => { + NotificationType::PaymentFailed + } + NotificationBody::Unknown => NotificationType::Unknown, + } + } +} + impl From for Notification { fn from(notif: DBNotification) -> Self { let (name, text, link, actions) = { @@ -173,6 +323,13 @@ impl From for Notification { }, vec![], ), + // Don't expose the `flow` field + NotificationBody::ResetPassword { .. } => ( + "Password reset requested".to_string(), + "You've requested to reset your password. Please check your email for a reset link.".to_string(), + "#".to_string(), + vec![], + ), NotificationBody::LegacyMarkdown { name, text, @@ -185,6 +342,64 @@ impl From for Notification { link.clone(), actions.clone().into_iter().collect(), ), + // The notifications from here to down below are listed with messages for completeness' sake, + // though they should never be sent via site notifications. This should be disabled via database + // options. Messages should be reviewed and worded better if we want to distribute these notifications + // via the site. + NotificationBody::PaymentFailed { .. } => ( + "Payment failed".to_string(), + "A payment on your account failed. Please update your billing information.".to_string(), + "/settings/billing".to_string(), + vec![], + ), + NotificationBody::VerifyEmail { .. } => ( + "Verify your email".to_string(), + "You've requested to verify your email. Please check your email for a verification link.".to_string(), + "#".to_string(), + vec![], + ), + NotificationBody::AuthProviderAdded { .. } => ( + "Auth provider added".to_string(), + "You've added a new authentication provider to your account.".to_string(), + "#".to_string(), + vec![], + ), + NotificationBody::AuthProviderRemoved { .. } => ( + "Auth provider removed".to_string(), + "You've removed a authentication provider from your account.".to_string(), + "#".to_string(), + vec![], + ), + NotificationBody::TwoFactorEnabled => ( + "Two-factor authentication enabled".to_string(), + "You've enabled two-factor authentication on your account.".to_string(), + "#".to_string(), + vec![], + ), + NotificationBody::TwoFactorRemoved => ( + "Two-factor authentication removed".to_string(), + "You've removed two-factor authentication from your account.".to_string(), + "#".to_string(), + vec![], + ), + NotificationBody::PasswordChanged => ( + "Password changed".to_string(), + "You've changed your account password.".to_string(), + "#".to_string(), + vec![], + ), + NotificationBody::PasswordRemoved => ( + "Password removed".to_string(), + "You've removed your account password.".to_string(), + "#".to_string(), + vec![], + ), + NotificationBody::EmailChanged { .. } => ( + "Email changed".to_string(), + "Your account email was changed.".to_string(), + "#".to_string(), + vec![], + ), NotificationBody::Unknown => { ("".to_string(), "".to_string(), "#".to_string(), vec![]) } @@ -221,3 +436,104 @@ impl From for NotificationAction { } } } + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum NotificationChannel { + Email, +} + +impl NotificationChannel { + pub fn list() -> &'static [Self] { + &[NotificationChannel::Email] + } + + pub fn as_str(self) -> &'static str { + match self { + NotificationChannel::Email => "email", + } + } + + pub fn from_str_or_default(s: &str) -> Self { + match s { + "email" => NotificationChannel::Email, + _ => NotificationChannel::Email, + } + } +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum NotificationDeliveryStatus { + Pending, + SkippedPreferences, + SkippedDefault, + Delivered, + PermanentlyFailed, +} + +impl NotificationDeliveryStatus { + pub fn as_user_error(self) -> Result<(), ApiError> { + match self { + NotificationDeliveryStatus::Delivered => Ok(()), + NotificationDeliveryStatus::SkippedPreferences | + NotificationDeliveryStatus::SkippedDefault | + NotificationDeliveryStatus::Pending => Err(ApiError::InvalidInput("An error occured while sending an email to your email address. Please try again later.".to_owned())), + NotificationDeliveryStatus::PermanentlyFailed => Err(ApiError::InvalidInput("This email address doesn't exist! Please try another one.".to_owned())), + } + } + + pub fn as_str(self) -> &'static str { + match self { + NotificationDeliveryStatus::Pending => "pending", + NotificationDeliveryStatus::SkippedPreferences => { + "skipped_preferences" + } + NotificationDeliveryStatus::SkippedDefault => "skipped_default", + NotificationDeliveryStatus::Delivered => "delivered", + NotificationDeliveryStatus::PermanentlyFailed => { + "permanently_failed" + } + } + } + + pub fn from_str_or_default(s: &str) -> Self { + match s { + "pending" => NotificationDeliveryStatus::Pending, + "skipped_preferences" => { + NotificationDeliveryStatus::SkippedPreferences + } + "skipped_default" => NotificationDeliveryStatus::SkippedDefault, + "delivered" => NotificationDeliveryStatus::Delivered, + "permanently_failed" => { + NotificationDeliveryStatus::PermanentlyFailed + } + _ => NotificationDeliveryStatus::Pending, + } + } +} + +#[derive(Serialize, Deserialize)] +pub struct NotificationDelivery { + pub notification_id: NotificationId, + pub user_id: UserId, + pub channel: NotificationChannel, + pub delivery_priority: i32, + pub status: NotificationDeliveryStatus, + pub next_attempt: DateTime, + pub attempt_count: i32, +} + +impl From for NotificationDelivery { + fn from(delivery: DBNotificationDelivery) -> Self { + Self { + notification_id: delivery.notification_id.into(), + user_id: delivery.user_id.into(), + channel: delivery.channel, + delivery_priority: delivery.delivery_priority, + status: delivery.status, + next_attempt: delivery.next_attempt, + attempt_count: delivery.attempt_count, + } + } +} diff --git a/apps/labrinth/src/queue/email.rs b/apps/labrinth/src/queue/email.rs new file mode 100644 index 00000000..ddaeeacf --- /dev/null +++ b/apps/labrinth/src/queue/email.rs @@ -0,0 +1,339 @@ +use crate::database::models::ids::*; +use crate::database::models::notification_item::DBNotification; +use crate::database::models::notifications_deliveries_item::DBNotificationDelivery; +use crate::database::models::notifications_template_item::NotificationTemplate; +use crate::database::models::user_item::DBUser; +use crate::database::redis::RedisPool; +use crate::models::notifications::NotificationBody; +use crate::models::v3::notifications::{ + NotificationChannel, NotificationDeliveryStatus, +}; +use crate::routes::ApiError; +use chrono::Utc; +use futures::stream::{FuturesUnordered, StreamExt}; +use lettre::message::Mailbox; +use lettre::transport::smtp::authentication::Credentials; +use lettre::transport::smtp::client::{Tls, TlsParameters}; +use lettre::{AsyncSmtpTransport, AsyncTransport, Tokio1Executor}; +use reqwest::Client; +use sqlx::PgPool; +use std::sync::Arc; +use thiserror::Error; +use tokio::sync::Mutex as TokioMutex; +use tracing::{error, info, instrument, warn}; + +const EMAIL_RETRY_DELAY_SECONDS: i64 = 10; + +pub enum Mailer { + Uninitialized, + Initialized(Arc>), +} + +impl Mailer { + pub async fn to_transport( + &mut self, + ) -> Result>, MailError> { + let maybe_transport = match self { + Mailer::Uninitialized => { + let username = dotenvy::var("SMTP_USERNAME")?; + let password = dotenvy::var("SMTP_PASSWORD")?; + let host = dotenvy::var("SMTP_HOST")?; + let port = + dotenvy::var("SMTP_PORT")?.parse::().unwrap_or(465); + + let creds = (!username.is_empty()) + .then(|| Credentials::new(username, password)); + + let tls_setting = match dotenvy::var("SMTP_TLS")?.as_str() { + "none" => Tls::None, + "opportunistic_start_tls" => Tls::Opportunistic( + TlsParameters::new(host.to_string())?, + ), + "requires_start_tls" => { + Tls::Required(TlsParameters::new(host.to_string())?) + } + "tls" => { + Tls::Wrapper(TlsParameters::new(host.to_string())?) + } + _ => { + warn!( + "Unrecognized SMTP TLS setting. Defaulting to TLS." + ); + Tls::Wrapper(TlsParameters::new(host.to_string())?) + } + }; + + let mut mailer = + AsyncSmtpTransport::::relay(&host)? + .port(port) + .tls(tls_setting); + + if let Some(creds) = creds { + mailer = mailer.credentials(creds); + } + + let mailer = mailer.build(); + + let result = mailer.test_connection().await; + + match &result { + Ok(true) => Some(Arc::new(mailer)), + Ok(false) => { + error!("SMTP NOOP failed, disabling mailer"); + None + } + Err(error) => { + error!(%error, "Failed to test SMTP connection, disabling mailer"); + None + } + } + } + Mailer::Initialized(transport) => Some(Arc::clone(transport)), + }; + + let transport = + maybe_transport.ok_or_else(|| MailError::Uninitialized)?; + *self = Mailer::Initialized(Arc::clone(&transport)); + Ok(transport) + } +} + +#[derive(Error, Debug)] +pub enum MailError { + #[error("Environment Error")] + Env(#[from] dotenvy::Error), + #[error("Mail Error: {0}")] + Mail(#[from] lettre::error::Error), + #[error("Address Parse Error: {0}")] + Address(#[from] lettre::address::AddressError), + #[error("SMTP Error: {0}")] + Smtp(#[from] lettre::transport::smtp::Error), + #[error("Couldn't initialize SMTP transport")] + Uninitialized, + #[error("HTTP error fetching template: {0}")] + HttpTemplate(#[from] reqwest::Error), +} + +#[derive(Clone)] +pub struct EmailQueue { + pg: PgPool, + client: reqwest::Client, + redis: RedisPool, + mailer: Arc>, + identity: templates::MailingIdentity, +} + +impl EmailQueue { + /// Initializes the email queue from environment variables, and tests the SMTP connection. + /// + /// # Panic + /// + /// Panics if a TLS backend cannot be initialized by [`reqwest::ClientBuilder`]. + pub fn init(pg: PgPool, redis: RedisPool) -> Result { + Ok(Self { + pg, + redis, + mailer: Arc::new(TokioMutex::new(Mailer::Uninitialized)), + identity: templates::MailingIdentity::from_env()?, + client: Client::builder() + .user_agent("Modrinth") + .build() + .expect("Failed to build HTTP client"), + }) + } + + #[instrument(name = "EmailQueue::index", skip_all)] + pub async fn index(&self) -> Result<(), ApiError> { + let transport = self.mailer.lock().await.to_transport().await?; + + let begin = std::time::Instant::now(); + + let mut deliveries = DBNotificationDelivery::lock_channel_processable( + NotificationChannel::Email, + 50, + &self.pg, + ) + .await?; + + if deliveries.is_empty() { + return Ok(()); + } + + let n_to_process = deliveries.len(); + + // Auto-fail deliveries which have been attempted over 3 times to avoid + // ballooning the error rate. + for d in deliveries.iter_mut().filter(|d| d.attempt_count >= 3) { + d.status = NotificationDeliveryStatus::PermanentlyFailed; + d.update(&self.pg).await?; + } + + // We hold a FOR UPDATE lock on the rows here, so no other workers are accessing them + // at the same time. + + let notification_ids = deliveries + .iter() + .filter(|d| d.attempt_count < 3) + .map(|d| d.notification_id) + .collect::>(); + let notifications = + DBNotification::get_many(¬ification_ids, &self.pg).await?; + + // For all notifications we collected, fill out the template + // and send it via SMTP in parallel. + + let mut futures = FuturesUnordered::new(); + + for notification in notifications { + let this = self.clone(); + let transport = Arc::clone(&transport); + + futures.push(async move { + let mut txn = this.pg.begin().await?; + + let maybe_user = DBUser::get_id( + notification.user_id, + &mut *txn, + &this.redis, + ) + .await?; + + let Some(mailbox) = maybe_user + .and_then(|user| user.email) + .and_then(|email| email.parse().ok()) + else { + return Ok(( + notification.id, + NotificationDeliveryStatus::SkippedPreferences, + )); + }; + + this.send_one_with_transport( + &mut txn, + transport, + notification.body, + notification.user_id, + mailbox, + ) + .await + .map(|status| (notification.id, status)) + }); + } + + while let Some(result) = futures.next().await { + match result { + Ok((notification_id, status)) => { + if let Some(idx) = deliveries + .iter() + .position(|d| d.notification_id == notification_id) + { + let update_next_attempt = + status == NotificationDeliveryStatus::Pending; + + let mut delivery = deliveries.remove(idx); + delivery.status = status; + delivery.next_attempt += if update_next_attempt { + chrono::Duration::seconds(EMAIL_RETRY_DELAY_SECONDS) + } else { + chrono::Duration::seconds(0) + }; + + delivery.attempt_count += 1; + delivery.update(&self.pg).await?; + } + } + + Err(error) => error!(%error, "Error building email"), + } + } + + for mut delivery in deliveries { + // For these, there was an error building the email, like a + // database error. Retry them after a delay. + + delivery.next_attempt = Utc::now() + + chrono::Duration::seconds(EMAIL_RETRY_DELAY_SECONDS); + + delivery.update(&self.pg).await?; + } + + info!( + "Processed {} email deliveries in {}ms", + n_to_process, + begin.elapsed().as_millis() + ); + + Ok(()) + } + + pub async fn send_one( + &self, + txn: &mut sqlx::PgTransaction<'_>, + notification: NotificationBody, + user_id: DBUserId, + address: Mailbox, + ) -> Result { + let transport = self.mailer.lock().await.to_transport().await?; + self.send_one_with_transport( + txn, + transport, + notification, + user_id, + address, + ) + .await + } + + async fn send_one_with_transport( + &self, + txn: &mut sqlx::PgTransaction<'_>, + transport: Arc>, + notification: NotificationBody, + user_id: DBUserId, + address: Mailbox, + ) -> Result { + // If there isn't any template present in the database for the + // notification type, skip it. + + let Some(template) = NotificationTemplate::list_channel( + NotificationChannel::Email, + &mut **txn, + &self.redis, + ) + .await? + .into_iter() + .find(|t| t.notification_type == notification.notification_type()) else { + return Ok(NotificationDeliveryStatus::SkippedDefault); + }; + + let message = templates::build_email( + &mut **txn, + &self.redis, + &self.client, + user_id, + ¬ification, + &template, + self.identity.clone(), + address, + ) + .await?; + + let send_result = transport.send(message).await; + + Ok(send_result.map_or_else(|error| { + error!(%error, smtp.code = ?extract_smtp_code(&error), "Error sending email"); + + if error.is_permanent() { + NotificationDeliveryStatus::PermanentlyFailed + } else { + NotificationDeliveryStatus::Pending + } + }, |_| NotificationDeliveryStatus::Delivered)) + } +} + +fn extract_smtp_code(e: &lettre::transport::smtp::Error) -> Option { + e.status().map(|x| x.into()) +} + +mod templates; diff --git a/apps/labrinth/src/queue/email/templates.rs b/apps/labrinth/src/queue/email/templates.rs new file mode 100644 index 00000000..4e269e4d --- /dev/null +++ b/apps/labrinth/src/queue/email/templates.rs @@ -0,0 +1,403 @@ +use super::MailError; +use crate::database::models::DBUser; +use crate::database::models::DatabaseError; +use crate::database::models::ids::*; +use crate::database::models::notifications_template_item::NotificationTemplate; +use crate::database::redis::RedisPool; +use crate::models::v3::notifications::NotificationBody; +use crate::routes::ApiError; +use futures::TryFutureExt; +use lettre::Message; +use lettre::message::{Mailbox, MultiPart, SinglePart}; +use sqlx::query; +use std::collections::HashMap; +use std::time::Duration; +use tracing::{error, warn}; + +const USER_NAME: &str = "user.name"; +const USER_EMAIL: &str = "user.email"; + +const RESETPASSWORD_URL: &str = "resetpassword.url"; +const VERIFYEMAIL_URL: &str = "verifyemail.url"; +const AUTHPROVIDER_NAME: &str = "authprovider.name"; +const EMAILCHANGED_NEW_EMAIL: &str = "emailchanged.new_email"; +const BILLING_URL: &str = "billing.url"; + +const PAYMENTFAILED_AMOUNT: &str = "paymentfailed.amount"; +const PAYMENTFAILED_SERVICE: &str = "paymentfailed.service"; + +const TEAMINVITE_INVITER_NAME: &str = "teaminvite.inviter.name"; +const TEAMINVITE_PROJECT_NAME: &str = "teaminvite.project.name"; +const TEAMINVITE_ROLE_NAME: &str = "teaminvite.role.name"; + +const ORGINVITE_INVITER_NAME: &str = "organizationinvite.inviter.name"; +const ORGINVITE_ORG_NAME: &str = "organizationinvite.organization.name"; +const ORGINVITE_ROLE_NAME: &str = "organizationinvite.role.name"; + +const STATUSCHANGE_PROJECT_NAME: &str = "statuschange.project.name"; +const STATUSCHANGE_OLD_STATUS: &str = "statuschange.old.status"; +const STATUSCHANGE_NEW_STATUS: &str = "statuschange.new.status"; + +#[derive(Clone)] +pub struct MailingIdentity { + from_name: String, + from_address: String, + reply_name: Option, + reply_address: Option, +} + +impl MailingIdentity { + pub fn from_env() -> dotenvy::Result { + Ok(Self { + from_name: dotenvy::var("SMTP_FROM_NAME")?, + from_address: dotenvy::var("SMTP_FROM_ADDRESS")?, + reply_name: dotenvy::var("SMTP_REPLY_TO_NAME").ok(), + reply_address: dotenvy::var("SMTP_REPLY_TO_ADDRESS").ok(), + }) + } +} + +#[allow(clippy::too_many_arguments)] +pub async fn build_email( + exec: impl sqlx::PgExecutor<'_>, + redis: &RedisPool, + client: &reqwest::Client, + user_id: DBUserId, + body: &NotificationBody, + template: &NotificationTemplate, + from: MailingIdentity, + to: Mailbox, +) -> Result { + let get_html_body = async { + let result: Result, ApiError> = + match template.get_cached_html_data(redis).await? { + Some(html_body) => Ok(Ok(html_body)), + None => { + let result = client + .get(&template.body_fetch_url) + .timeout(Duration::from_secs(3)) + .send() + .and_then(|res| async move { res.error_for_status() }) + .and_then(|res| res.text()) + .await; + + if let Ok(ref body) = result { + template + .set_cached_html_data(body.clone(), redis) + .await?; + } + + Ok(result) + } + }; + + result + }; + + let MailingIdentity { + from_name, + from_address, + reply_name, + reply_address, + } = from; + + let (html_body_result, mut variables) = futures::try_join!( + get_html_body, + collect_template_variables(exec, redis, user_id, body) + )?; + + variables.insert(USER_EMAIL, to.email.to_string()); + + let mut message_builder = Message::builder().from(Mailbox::new( + Some(from_name), + from_address.parse().map_err(MailError::from)?, + )); + + if let Some((name, address)) = reply_name.zip(reply_address) { + message_builder = message_builder.reply_to(Mailbox::new( + Some(name), + address.parse().map_err(MailError::from)?, + )); + } + + message_builder = message_builder.to(to).subject(&template.subject_line); + + let plaintext_filled_body = + fill_template(&template.plaintext_fallback, &variables); + + let email_message = match html_body_result { + Ok(html_body) => { + let html_filled_body = fill_template(&html_body, &variables); + message_builder + .multipart(MultiPart::alternative_plain_html( + plaintext_filled_body, + html_filled_body, + )) + .map_err(MailError::from)? + } + + Err(error) => { + error!(%error, "Failed to fetch template body"); + message_builder + .singlepart(SinglePart::plain(plaintext_filled_body)) + .map_err(MailError::from)? + } + }; + + Ok(email_message) +} + +fn fill_template( + mut text: &str, + variables: &HashMap<&'static str, String>, +) -> String { + let mut buffer = String::with_capacity(text.len()); + + loop { + if let Some((previous, start_variable)) = text.split_once('{') { + buffer.push_str(previous); + + if let Some((variable_name, rest)) = start_variable.split_once('}') + { + // Replace variable with an empty string if it isn't matched + buffer.push_str( + variables + .get(variable_name) + .map(|s| s.as_str()) + .unwrap_or_default(), + ); + text = rest; + } else { + warn!("Unmatched open brace in template"); + text = start_variable; + } + } else { + buffer.push_str(text); + break; + } + } + + buffer +} + +async fn collect_template_variables( + exec: impl sqlx::PgExecutor<'_>, + redis: &RedisPool, + user_id: DBUserId, + n: &NotificationBody, +) -> Result, ApiError> { + async fn only_select_default_variables( + exec: impl sqlx::PgExecutor<'_>, + redis: &RedisPool, + user_id: DBUserId, + ) -> Result, ApiError> { + let mut map = HashMap::new(); + + let user = DBUser::get_id(user_id, exec, redis) + .await? + .ok_or_else(|| DatabaseError::Database(sqlx::Error::RowNotFound))?; + + map.insert(USER_NAME, user.username); + Ok(map) + } + + match &n { + NotificationBody::TeamInvite { + team_id: _, + project_id, + invited_by, + role, + } => { + let result = query!( + r#" + SELECT + users.username "user_name!", + users.email "user_email", + inviter.username "inviter_name!", + project.name "project_name!" + FROM users + INNER JOIN users inviter ON inviter.id = $1 + INNER JOIN mods project ON project.id = $2 + WHERE users.id = $3 + "#, + invited_by.0 as i64, + project_id.0 as i64, + user_id.0 as i64 + ) + .fetch_one(exec) + .await?; + + let mut map = HashMap::new(); + map.insert(USER_NAME, result.user_name); + map.insert(TEAMINVITE_INVITER_NAME, result.inviter_name); + map.insert(TEAMINVITE_PROJECT_NAME, result.project_name); + map.insert(TEAMINVITE_ROLE_NAME, role.clone()); + + Ok(map) + } + + NotificationBody::OrganizationInvite { + organization_id, + invited_by, + team_id: _, + role, + } => { + let result = query!( + r#" + SELECT + users.username "user_name!", + users.email "user_email", + inviter.username "inviter_name!", + organization.name "organization_name!" + FROM users + INNER JOIN users inviter ON inviter.id = $1 + INNER JOIN organizations organization ON organization.id = $2 + WHERE users.id = $3 + "#, + invited_by.0 as i64, + organization_id.0 as i64, + user_id.0 as i64 + ) + .fetch_one(exec) + .await?; + + let mut map = HashMap::new(); + map.insert(USER_NAME, result.user_name); + map.insert(ORGINVITE_INVITER_NAME, result.inviter_name); + map.insert(ORGINVITE_ORG_NAME, result.organization_name); + map.insert(ORGINVITE_ROLE_NAME, role.clone()); + + Ok(map) + } + + NotificationBody::StatusChange { + project_id, + old_status, + new_status, + } => { + let result = query!( + r#" + SELECT + users.username "user_name!", + users.email "user_email", + project.name "project_name!" + FROM users + INNER JOIN mods project ON project.id = $1 + WHERE users.id = $2 + "#, + project_id.0 as i64, + user_id.0 as i64, + ) + .fetch_one(exec) + .await?; + + let mut map = HashMap::new(); + map.insert(USER_NAME, result.user_name); + map.insert(STATUSCHANGE_PROJECT_NAME, result.project_name); + map.insert(STATUSCHANGE_OLD_STATUS, old_status.as_str().to_owned()); + map.insert(STATUSCHANGE_NEW_STATUS, new_status.as_str().to_owned()); + + Ok(map) + } + + NotificationBody::ResetPassword { flow } => { + let url = format!( + "{}/{}?flow={}", + dotenvy::var("SITE_URL")?, + dotenvy::var("SITE_RESET_PASSWORD_PATH")?, + flow + ); + + let user = DBUser::get_id(user_id, exec, redis).await?.ok_or_else( + || DatabaseError::Database(sqlx::Error::RowNotFound), + )?; + + let mut map = HashMap::new(); + map.insert(RESETPASSWORD_URL, url); + map.insert(USER_NAME, user.username); + + Ok(map) + } + + NotificationBody::VerifyEmail { flow } => { + let url = format!( + "{}/{}?flow={}", + dotenvy::var("SITE_URL")?, + dotenvy::var("SITE_VERIFY_EMAIL_PATH")?, + flow + ); + + let user = DBUser::get_id(user_id, exec, redis).await?.ok_or_else( + || DatabaseError::Database(sqlx::Error::RowNotFound), + )?; + + let mut map = HashMap::new(); + map.insert(VERIFYEMAIL_URL, url); + map.insert(USER_NAME, user.username); + + Ok(map) + } + + NotificationBody::AuthProviderAdded { provider } + | NotificationBody::AuthProviderRemoved { provider } => { + let user = DBUser::get_id(user_id, exec, redis).await?.ok_or_else( + || DatabaseError::Database(sqlx::Error::RowNotFound), + )?; + + let mut map = HashMap::new(); + map.insert(USER_NAME, user.username); + map.insert(AUTHPROVIDER_NAME, provider.clone()); + + Ok(map) + } + + NotificationBody::TwoFactorEnabled + | NotificationBody::TwoFactorRemoved + | NotificationBody::PasswordChanged + | NotificationBody::PasswordRemoved => { + only_select_default_variables(exec, redis, user_id).await + } + + NotificationBody::EmailChanged { + new_email, + to_email: _, + } => { + let user = DBUser::get_id(user_id, exec, redis).await?.ok_or_else( + || DatabaseError::Database(sqlx::Error::RowNotFound), + )?; + + let mut map = HashMap::new(); + map.insert(USER_NAME, user.username); + map.insert(EMAILCHANGED_NEW_EMAIL, new_email.clone()); + + Ok(map) + } + + NotificationBody::PaymentFailed { amount, service } => { + let user = DBUser::get_id(user_id, exec, redis).await?.ok_or_else( + || DatabaseError::Database(sqlx::Error::RowNotFound), + )?; + + let url = format!( + "{}/{}", + dotenvy::var("SITE_URL")?, + dotenvy::var("SITE_BILLING_PATH")?, + ); + + let mut map = HashMap::new(); + map.insert(USER_NAME, user.username); + map.insert(PAYMENTFAILED_AMOUNT, amount.clone()); + map.insert(PAYMENTFAILED_SERVICE, service.clone()); + map.insert(BILLING_URL, url); + + Ok(map) + } + + NotificationBody::ProjectUpdate { .. } + | NotificationBody::LegacyMarkdown { .. } + | NotificationBody::ModeratorMessage { .. } + | NotificationBody::Unknown => { + only_select_default_variables(exec, redis, user_id).await + } + } +} diff --git a/apps/labrinth/src/queue/mod.rs b/apps/labrinth/src/queue/mod.rs index 7ccf81c0..b552d728 100644 --- a/apps/labrinth/src/queue/mod.rs +++ b/apps/labrinth/src/queue/mod.rs @@ -1,4 +1,5 @@ pub mod analytics; +pub mod email; pub mod maxmind; pub mod moderation; pub mod payouts; diff --git a/apps/labrinth/src/routes/internal/billing.rs b/apps/labrinth/src/routes/internal/billing.rs index 43151220..c44058ba 100644 --- a/apps/labrinth/src/routes/internal/billing.rs +++ b/apps/labrinth/src/routes/internal/billing.rs @@ -1,5 +1,6 @@ -use crate::auth::{get_user_from_headers, send_email}; +use crate::auth::get_user_from_headers; use crate::database::models::charge_item::DBCharge; +use crate::database::models::notification_item::NotificationBuilder; use crate::database::models::user_item::DBUser; use crate::database::models::user_subscription_item::DBUserSubscription; use crate::database::models::users_redeemals::{self, UserRedeemal}; @@ -13,6 +14,7 @@ use crate::models::billing::{ Product, ProductMetadata, ProductPrice, SubscriptionMetadata, SubscriptionStatus, UserSubscription, }; +use crate::models::notifications::NotificationBody; use crate::models::pats::Scopes; use crate::models::users::Badges; use crate::queue::session::AuthQueue; @@ -2458,7 +2460,7 @@ pub async fn stripe_webhook( ) .await?; - if let Some(email) = metadata.user_item.email { + if metadata.user_item.email.is_some() { let money = rusty_money::Money::from_minor( metadata.charge_item.amount as i64, rusty_money::iso::find( @@ -2467,22 +2469,29 @@ pub async fn stripe_webhook( .unwrap_or(rusty_money::iso::USD), ); - let _ = send_email( - email, - "Payment Failed for Modrinth", - &format!( - "Our attempt to collect payment for {money} from the payment card on file was unsuccessful." - ), - "Please visit the following link below to update your payment method or contact your card provider. If the button does not work, you can copy the link and paste it into your browser.", - Some(( - "Update billing settings", - &format!( - "{}/{}", - dotenvy::var("SITE_URL")?, - dotenvy::var("SITE_BILLING_PATH")? - ), - )), - ); + NotificationBuilder { + body: NotificationBody::PaymentFailed { + amount: money.to_string(), + service: if metadata + .product_item + .metadata + .is_midas() + { + "Modrinth+" + } else if metadata + .product_item + .metadata + .is_pyro() + { + "Modrinth Servers" + } else { + "a Modrinth product" + } + .to_owned(), + }, + } + .insert(metadata.user_item.id, &mut transaction, &redis) + .await?; } transaction.commit().await?; diff --git a/apps/labrinth/src/routes/internal/external_notifications.rs b/apps/labrinth/src/routes/internal/external_notifications.rs new file mode 100644 index 00000000..0efa6276 --- /dev/null +++ b/apps/labrinth/src/routes/internal/external_notifications.rs @@ -0,0 +1,52 @@ +use crate::database::models::ids::DBUserId; +use crate::database::models::notification_item::NotificationBuilder; +use crate::database::models::user_item::DBUser; +use crate::database::redis::RedisPool; +use crate::models::v3::notifications::NotificationBody; +use crate::routes::ApiError; +use crate::util::guards::external_notification_key_guard; +use actix_web::web; +use actix_web::{HttpResponse, post}; +use ariadne::ids::UserId; +use serde::Deserialize; +use sqlx::PgPool; + +pub fn config(cfg: &mut web::ServiceConfig) { + cfg.service(create); +} + +#[derive(Deserialize)] +struct CreateNotification { + pub body: NotificationBody, + pub user_ids: Vec, +} + +#[post("external_notifications", guard = "external_notification_key_guard")] +pub async fn create( + pool: web::Data, + redis: web::Data, + create_notification: web::Json, +) -> Result { + let CreateNotification { body, user_ids } = + create_notification.into_inner(); + let user_ids = user_ids + .into_iter() + .map(|x| DBUserId(x.0 as i64)) + .collect::>(); + + let mut txn = pool.begin().await?; + + if !DBUser::exists_many(&user_ids, &mut *txn).await? { + return Err(ApiError::InvalidInput( + "One of the specified users do not exist.".to_owned(), + )); + } + + NotificationBuilder { body } + .insert_many(user_ids, &mut txn, &redis) + .await?; + + txn.commit().await?; + + Ok(HttpResponse::Accepted().finish()) +} diff --git a/apps/labrinth/src/routes/internal/flows.rs b/apps/labrinth/src/routes/internal/flows.rs index 0c7a37ba..246f0534 100644 --- a/apps/labrinth/src/routes/internal/flows.rs +++ b/apps/labrinth/src/routes/internal/flows.rs @@ -1,14 +1,16 @@ -use crate::auth::email::send_email; use crate::auth::validate::{ get_full_user_from_headers, get_user_record_from_bearer_token, }; use crate::auth::{AuthProvider, AuthenticationError, get_user_from_headers}; use crate::database::models::DBUser; use crate::database::models::flow_item::DBFlow; +use crate::database::models::notification_item::NotificationBuilder; use crate::database::redis::RedisPool; use crate::file_hosting::{FileHost, FileHostPublicity}; +use crate::models::notifications::NotificationBody; use crate::models::pats::Scopes; use crate::models::users::{Badges, Role}; +use crate::queue::email::EmailQueue; use crate::queue::session::AuthQueue; use crate::routes::ApiError; use crate::routes::internal::session::issue_session; @@ -25,6 +27,7 @@ use ariadne::ids::base62_impl::{parse_base62, to_base62}; use ariadne::ids::random_base62_rng; use base64::Engine; use chrono::{Duration, Utc}; +use lettre::message::Mailbox; use rand_chacha::ChaCha20Rng; use rand_chacha::rand_core::SeedableRng; use reqwest::header::AUTHORIZATION; @@ -1159,14 +1162,10 @@ pub async fn auth_callback( ) .execute(&mut *transaction) .await?; - } else if let Some(email) = user.and_then(|x| x.email) { - send_email( - email, - "Authentication method added", - &format!("When logging into Modrinth, you can now log in using the {} authentication provider.", provider.as_str()), - "If you did not make this change, please contact us immediately through our support channels on Discord or via email (support@modrinth.com).", - None, - )?; + } else if let Some(user) = user { + NotificationBuilder { body: NotificationBody::AuthProviderAdded { provider: provider.as_str().to_string() } } + .insert(user.id, &mut transaction, &redis) + .await?; } transaction.commit().await?; @@ -1268,19 +1267,14 @@ pub async fn delete_auth_provider( .update_user_id(user.id.into(), None, &mut transaction) .await?; - if delete_provider.provider != AuthProvider::PayPal - && let Some(email) = user.email - { - send_email( - email, - "Authentication method removed", - &format!( - "When logging into Modrinth, you can no longer log in using the {} authentication provider.", - delete_provider.provider.as_str() - ), - "If you did not make this change, please contact us immediately through our support channels on Discord or via email (support@modrinth.com).", - None, - )?; + if delete_provider.provider != AuthProvider::PayPal { + NotificationBuilder { + body: NotificationBody::AuthProviderRemoved { + provider: delete_provider.provider.as_str().to_string(), + }, + } + .insert(user.id.into(), &mut transaction, &redis) + .await?; } transaction.commit().await?; @@ -1342,6 +1336,7 @@ pub async fn create_account_with_password( pool: Data, redis: Data, new_account: web::Json, + email: web::Data, ) -> Result { new_account.0.validate().map_err(|err| { ApiError::InvalidInput(validation_errors_to_string(err, None)) @@ -1437,6 +1432,10 @@ pub async fn create_account_with_password( let session = issue_session(req, user_id, &mut transaction, &redis).await?; let res = crate::models::sessions::Session::from(session, true, None); + let mailbox: Mailbox = new_account.email.parse().map_err(|_| { + ApiError::InvalidInput("Invalid email address!".to_string()) + })?; + let flow = DBFlow::ConfirmEmail { user_id, confirm_email: new_account.email.clone(), @@ -1444,11 +1443,15 @@ pub async fn create_account_with_password( .insert(Duration::hours(24), &redis) .await?; - send_email_verify( - new_account.email.clone(), - flow, - &format!("Welcome to Modrinth, {}!", new_account.username), - )?; + email + .send_one( + &mut transaction, + NotificationBody::VerifyEmail { flow }, + user_id, + mailbox, + ) + .await? + .as_user_error()?; transaction.commit().await?; @@ -1796,15 +1799,11 @@ pub async fn finish_2fa_flow( codes.push(to_base62(val)); } - if let Some(email) = user.email { - send_email( - email, - "Two-factor authentication enabled", - "When logging into Modrinth, you can now enter a code generated by your authenticator app in addition to entering your usual email address and password.", - "If you did not make this change, please contact us immediately through our support channels on Discord or via email (support@modrinth.com).", - None, - )?; + NotificationBuilder { + body: NotificationBody::TwoFactorEnabled, } + .insert(user.id.into(), &mut transaction, &redis) + .await?; transaction.commit().await?; crate::database::models::DBUser::clear_caches( @@ -1895,15 +1894,11 @@ pub async fn remove_2fa( .execute(&mut *transaction) .await?; - if let Some(email) = user.email { - send_email( - email, - "Two-factor authentication removed", - "When logging into Modrinth, you no longer need two-factor authentication to gain access.", - "If you did not make this change, please contact us immediately through our support channels on Discord or via email (support@modrinth.com).", - None, - )?; + NotificationBuilder { + body: NotificationBody::TwoFactorRemoved, } + .insert(user.id, &mut transaction, &redis) + .await?; transaction.commit().await?; crate::database::models::DBUser::clear_caches(&[(user.id, None)], &redis) @@ -1925,15 +1920,18 @@ pub async fn reset_password_begin( pool: Data, redis: Data, reset_password: web::Json, + email: web::Data, ) -> Result { if !check_hcaptcha(&req, &reset_password.challenge).await? { return Err(ApiError::Turnstile); } + let mut txn = pool.begin().await?; + let user = match crate::database::models::DBUser::get_by_case_insensitive_email( &reset_password.username_or_email, - &**pool, + &mut *txn, ) .await?[..] { @@ -1941,7 +1939,7 @@ pub async fn reset_password_begin( // Try finding by username or ID crate::database::models::DBUser::get( &reset_password.username_or_email, - &**pool, + &mut *txn, &redis, ) .await? @@ -1950,7 +1948,7 @@ pub async fn reset_password_begin( // If there is only one user with the given email, ignoring case, // we can assume it's the user we want to reset the password for crate::database::models::DBUser::get_id( - user_id, &**pool, &redis, + user_id, &mut *txn, &redis, ) .await? } @@ -1962,12 +1960,12 @@ pub async fn reset_password_begin( if let Some(user_id) = crate::database::models::DBUser::get_by_email( &reset_password.username_or_email, - &**pool, + &mut *txn, ) .await? { crate::database::models::DBUser::get_id( - user_id, &**pool, &redis, + user_id, &mut *txn, &redis, ) .await? } else { @@ -1978,7 +1976,7 @@ pub async fn reset_password_begin( if let Some(DBUser { id: user_id, - email: Some(email), + email: user_email, .. }) = user { @@ -1986,23 +1984,21 @@ pub async fn reset_password_begin( .insert(Duration::hours(24), &redis) .await?; - send_email( - email, - "Reset your password", - "Please visit the following link below to reset your password. If the button does not work, you can copy the link and paste it into your browser.", - "If you did not request for your password to be reset, you can safely ignore this email.", - Some(( - "Reset password", - &format!( - "{}/{}?flow={}", - dotenvy::var("SITE_URL")?, - dotenvy::var("SITE_RESET_PASSWORD_PATH")?, - flow - ), - )), - )?; + if let Ok(mailbox) = user_email.unwrap_or_default().parse() { + email + .send_one( + &mut txn, + NotificationBody::ResetPassword { flow }, + user_id, + mailbox, + ) + .await? + .as_user_error()?; + } } + txn.commit().await?; + Ok(HttpResponse::Ok().finish()) } @@ -2138,20 +2134,18 @@ pub async fn change_password( DBFlow::remove(flow, &redis).await?; } - if let Some(email) = user.email { - let changed = if update_password.is_some() { - "changed" - } else { - "removed" - }; - - send_email( - email, - &format!("Password {changed}"), - &format!("Your password has been {changed} on your account."), - "If you did not make this change, please contact us immediately through our support channels on Discord or via email (support@modrinth.com).", - None, - )?; + if update_password.is_some() { + NotificationBuilder { + body: NotificationBody::PasswordChanged, + } + .insert(user.id, &mut transaction, &redis) + .await?; + } else { + NotificationBuilder { + body: NotificationBody::PasswordRemoved, + } + .insert(user.id, &mut transaction, &redis) + .await?; } transaction.commit().await?; @@ -2172,14 +2166,19 @@ pub async fn set_email( req: HttpRequest, pool: Data, redis: Data, - email: web::Json, + email_address: web::Json, + email: web::Data, session_queue: Data, stripe_client: Data, ) -> Result { - email.0.validate().map_err(|err| { + email_address.0.validate().map_err(|err| { ApiError::InvalidInput(validation_errors_to_string(err, None)) })?; + let mailbox: Mailbox = email_address.email.parse().map_err(|_| { + ApiError::InvalidInput("Invalid email address!".to_string()) + })?; + let user = get_user_from_headers( &req, &**pool, @@ -2191,7 +2190,7 @@ pub async fn set_email( .1; if !crate::database::models::DBUser::get_by_case_insensitive_email( - &email.email, + &email_address.email, &**pool, ) .await? @@ -2210,23 +2209,21 @@ pub async fn set_email( SET email = $1, email_verified = FALSE WHERE (id = $2) ", - email.email, + email_address.email, user.id.0 as i64, ) .execute(&mut *transaction) .await?; - if let Some(user_email) = user.email { - send_email( - user_email, - "Email changed", - &format!( - "Your email has been updated to {} on your account.", - email.email - ), - "If you did not make this change, please contact us immediately through our support channels on Discord or via email (support@modrinth.com).", - None, - )?; + if let Some(user_email) = user.email.clone() { + NotificationBuilder { + body: NotificationBody::EmailChanged { + new_email: email_address.email.clone(), + to_email: user_email, + }, + } + .insert(user.id.into(), &mut transaction, &redis) + .await?; } if let Some(customer_id) = user @@ -2238,7 +2235,7 @@ pub async fn set_email( &stripe_client, &customer_id, stripe::UpdateCustomer { - email: Some(&email.email), + email: Some(&email_address.email), ..Default::default() }, ) @@ -2247,18 +2244,23 @@ pub async fn set_email( let flow = DBFlow::ConfirmEmail { user_id: user.id.into(), - confirm_email: email.email.clone(), + confirm_email: email_address.email.clone(), } .insert(Duration::hours(24), &redis) .await?; - send_email_verify( - email.email.clone(), - flow, - "We need to verify your email address.", - )?; + email + .send_one( + &mut transaction, + NotificationBody::VerifyEmail { flow }, + user.id.into(), + mailbox, + ) + .await? + .as_user_error()?; transaction.commit().await?; + crate::database::models::DBUser::clear_caches( &[(user.id.into(), None)], &redis, @@ -2274,6 +2276,7 @@ pub async fn resend_verify_email( pool: Data, redis: Data, session_queue: Data, + email: web::Data, ) -> Result { let user = get_user_from_headers( &req, @@ -2285,7 +2288,7 @@ pub async fn resend_verify_email( .await? .1; - if let Some(email) = user.email { + if let Some(email_address) = user.email { if user.email_verified.unwrap_or(false) { return Err(ApiError::InvalidInput( "User email is already verified!".to_string(), @@ -2294,16 +2297,28 @@ pub async fn resend_verify_email( let flow = DBFlow::ConfirmEmail { user_id: user.id.into(), - confirm_email: email.clone(), + confirm_email: email_address.clone(), } .insert(Duration::hours(24), &redis) .await?; - send_email_verify( - email, - flow, - "We need to verify your email address.", - )?; + let mailbox: Mailbox = email_address.parse().map_err(|_| { + ApiError::InvalidInput("Invalid email address!".to_string()) + })?; + + let mut transaction = pool.begin().await?; + + email + .send_one( + &mut transaction, + NotificationBody::VerifyEmail { flow }, + user.id.into(), + mailbox, + ) + .await? + .as_user_error()?; + + transaction.commit().await?; Ok(HttpResponse::NoContent().finish()) } else { @@ -2438,25 +2453,3 @@ pub async fn get_newsletter_subscription_status( "subscribed": is_subscribed }))) } - -fn send_email_verify( - email: String, - flow: String, - opener: &str, -) -> Result<(), crate::auth::email::MailError> { - send_email( - email, - "Verify your email", - opener, - "Please visit the following link below to verify your email. If the button does not work, you can copy the link and paste it into your browser. This link expires in 24 hours.", - Some(( - "Verify email", - &format!( - "{}/{}?flow={}", - dotenvy::var("SITE_URL")?, - dotenvy::var("SITE_VERIFY_EMAIL_PATH")?, - flow - ), - )), - ) -} diff --git a/apps/labrinth/src/routes/internal/gdpr.rs b/apps/labrinth/src/routes/internal/gdpr.rs index ecc8b984..79022519 100644 --- a/apps/labrinth/src/routes/internal/gdpr.rs +++ b/apps/labrinth/src/routes/internal/gdpr.rs @@ -69,14 +69,22 @@ pub async fn export( .map(|x| crate::models::organizations::Organization::from(x, vec![])) .collect::>(); - let notifs = crate::database::models::notification_item::DBNotification::get_many_user( - user_id, &**pool, &redis, + let notifs = crate::database::models::notification_item::DBNotification::get_all_user( + user_id, &**pool, ) .await? .into_iter() .map(crate::models::notifications::Notification::from) .collect::>(); + let notifs_deliveries = crate::database::models::notifications_deliveries_item::DBNotificationDelivery::get_all_user( + user_id, &**pool, + ) + .await? + .into_iter() + .map(crate::models::notifications::NotificationDelivery::from) + .collect::>(); + let oauth_clients = crate::database::models::oauth_client_item::DBOAuthClient::get_all_user_clients( user_id, &**pool, @@ -195,6 +203,7 @@ pub async fn export( "projects": projects, "orgs": orgs, "notifs": notifs, + "notifs_deliveries": notifs_deliveries, "oauth_clients": oauth_clients, "oauth_authorizations": oauth_authorizations, "pats": pats, diff --git a/apps/labrinth/src/routes/internal/mod.rs b/apps/labrinth/src/routes/internal/mod.rs index 3330ab13..073f08c1 100644 --- a/apps/labrinth/src/routes/internal/mod.rs +++ b/apps/labrinth/src/routes/internal/mod.rs @@ -1,5 +1,6 @@ pub(crate) mod admin; pub mod billing; +pub mod external_notifications; pub mod flows; pub mod gdpr; pub mod medal; @@ -26,6 +27,7 @@ pub fn config(cfg: &mut actix_web::web::ServiceConfig) { .configure(billing::config) .configure(gdpr::config) .configure(statuses::config) - .configure(medal::config), + .configure(medal::config) + .configure(external_notifications::config), ); } diff --git a/apps/labrinth/src/routes/mod.rs b/apps/labrinth/src/routes/mod.rs index 327f67e7..6508792c 100644 --- a/apps/labrinth/src/routes/mod.rs +++ b/apps/labrinth/src/routes/mod.rs @@ -130,7 +130,7 @@ pub enum ApiError { #[error("Password Hashing Error: {0}")] PasswordHashing(#[from] argon2::password_hash::Error), #[error("{0}")] - Mail(#[from] crate::auth::email::MailError), + Mail(#[from] crate::queue::email::MailError), #[error("Error while rerouting request: {0}")] Reroute(#[from] reqwest::Error), #[error("Unable to read Zip Archive: {0}")] diff --git a/apps/labrinth/src/routes/v3/users.rs b/apps/labrinth/src/routes/v3/users.rs index 9f48fc6b..f0e1a070 100644 --- a/apps/labrinth/src/routes/v3/users.rs +++ b/apps/labrinth/src/routes/v3/users.rs @@ -779,7 +779,7 @@ pub async fn user_notifications( } let mut notifications: Vec = - crate::database::models::notification_item::DBNotification::get_many_user( + crate::database::models::notification_item::DBNotification::get_many_user_exposed_on_site( id, &**pool, &redis, ) .await? diff --git a/apps/labrinth/src/scheduler.rs b/apps/labrinth/src/scheduler.rs index 82213eff..7a4a6d03 100644 --- a/apps/labrinth/src/scheduler.rs +++ b/apps/labrinth/src/scheduler.rs @@ -1,5 +1,6 @@ use actix_rt::Arbiter; use futures::StreamExt; +use tokio_stream::wrappers::IntervalStream; pub struct Scheduler { arbiter: Arbiter, @@ -18,7 +19,7 @@ impl Scheduler { } } - pub fn run(&mut self, interval: std::time::Duration, mut task: F) + pub fn run(&self, interval: std::time::Duration, mut task: F) where F: FnMut() -> R + Send + 'static, R: std::future::Future + Send + 'static, @@ -35,5 +36,3 @@ impl Drop for Scheduler { self.arbiter.stop(); } } - -use tokio_stream::wrappers::IntervalStream; diff --git a/apps/labrinth/src/util/guards.rs b/apps/labrinth/src/util/guards.rs index d1fa513d..ec46a4f8 100644 --- a/apps/labrinth/src/util/guards.rs +++ b/apps/labrinth/src/util/guards.rs @@ -2,6 +2,7 @@ use actix_web::guard::GuardContext; pub const ADMIN_KEY_HEADER: &str = "Modrinth-Admin"; pub const MEDAL_KEY_HEADER: &str = "X-Medal-Access-Key"; +pub const EXTERNAL_NOTIFICATION_KEY_HEADER: &str = "External-Notification-Key"; pub fn admin_key_guard(ctx: &GuardContext) -> bool { let admin_key = std::env::var("LABRINTH_ADMIN_KEY").expect( @@ -25,3 +26,19 @@ pub fn medal_key_guard(ctx: &GuardContext) -> bool { .is_some_and(|it| it.as_bytes() == medal_key.as_bytes()), } } + +pub fn external_notification_key_guard(ctx: &GuardContext) -> bool { + let maybe_external_notification_key = + dotenvy::var("LABRINTH_EXTERNAL_NOTIFICATION_KEY").ok(); + + match maybe_external_notification_key { + None => false, + Some(external_notification_key) => ctx + .head() + .headers() + .get(EXTERNAL_NOTIFICATION_KEY_HEADER) + .is_some_and(|it| { + it.as_bytes() == external_notification_key.as_bytes() + }), + } +} diff --git a/apps/labrinth/tests/common/mod.rs b/apps/labrinth/tests/common/mod.rs index 3b1baa96..1efc99bf 100644 --- a/apps/labrinth/tests/common/mod.rs +++ b/apps/labrinth/tests/common/mod.rs @@ -1,3 +1,4 @@ +use labrinth::queue::email::EmailQueue; use labrinth::{LabrinthConfig, file_hosting, queue}; use labrinth::{check_env_vars, clickhouse}; use std::sync::Arc; @@ -39,6 +40,9 @@ pub async fn setup(db: &database::TemporaryDatabase) -> LabrinthConfig { let stripe_client = stripe::Client::new(dotenvy::var("STRIPE_API_KEY").unwrap()); + let email_queue = + EmailQueue::init(pool.clone(), redis_pool.clone()).unwrap(); + labrinth::app_setup( pool.clone(), ro_pool.clone(), @@ -48,6 +52,7 @@ pub async fn setup(db: &database::TemporaryDatabase) -> LabrinthConfig { file_host.clone(), maxmind_reader, stripe_client, + email_queue, false, ) }