From 33b2a94d9091591e402fe265d44484aff4ff3c6f Mon Sep 17 00:00:00 2001 From: Geometrically <18202329+Geometrically@users.noreply.github.com> Date: Mon, 5 Feb 2024 12:24:12 -0700 Subject: [PATCH] Fix version creation taking forever (#878) * Fix version creation taking forever * run fmt + prep * fix tests? --- ...f041d8081eb317a21b122c0d61d7b13f58072.json | 22 ++++ src/database/models/ids.rs | 40 ++++++- src/database/models/notification_item.rs | 52 +++------ src/models/v3/ids.rs | 8 +- src/ratelimit/memory.rs | 100 ------------------ 5 files changed, 82 insertions(+), 140 deletions(-) create mode 100644 .sqlx/query-520b6b75e79245e9ec19dbe5c30f041d8081eb317a21b122c0d61d7b13f58072.json diff --git a/.sqlx/query-520b6b75e79245e9ec19dbe5c30f041d8081eb317a21b122c0d61d7b13f58072.json b/.sqlx/query-520b6b75e79245e9ec19dbe5c30f041d8081eb317a21b122c0d61d7b13f58072.json new file mode 100644 index 00000000..893e3ac9 --- /dev/null +++ b/.sqlx/query-520b6b75e79245e9ec19dbe5c30f041d8081eb317a21b122c0d61d7b13f58072.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT EXISTS(SELECT 1 FROM notifications WHERE id = ANY($1))", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "exists", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Int8Array" + ] + }, + "nullable": [ + null + ] + }, + "hash": "520b6b75e79245e9ec19dbe5c30f041d8081eb317a21b122c0d61d7b13f58072" +} diff --git a/src/database/models/ids.rs b/src/database/models/ids.rs index d7e4a97a..2b640c69 100644 --- a/src/database/models/ids.rs +++ b/src/database/models/ids.rs @@ -1,6 +1,6 @@ use super::DatabaseError; use crate::models::ids::base62_impl::to_base62; -use crate::models::ids::random_base62_rng; +use crate::models::ids::{random_base62_rng, random_base62_rng_range}; use censor::Censor; use serde::{Deserialize, Serialize}; use sqlx::sqlx_macros::Type; @@ -41,6 +41,37 @@ macro_rules! generate_ids { }; } +macro_rules! generate_bulk_ids { + ($vis:vis $function_name:ident, $return_type:ty, $select_stmnt:literal, $id_function:expr) => { + $vis async fn $function_name( + count: usize, + con: &mut sqlx::Transaction<'_, sqlx::Postgres>, + ) -> Result, DatabaseError> { + let mut rng = rand::thread_rng(); + let mut retry_count = 0; + + // Check if ID is unique + loop { + let base = random_base62_rng_range(&mut rng, 1, 10) as i64; + let ids = (0..count).map(|x| base + x as i64).collect::>(); + + let results = sqlx::query!($select_stmnt, &ids) + .fetch_one(&mut **con) + .await?; + + if !results.exists.unwrap_or(true) { + return Ok(ids.into_iter().map(|x| $id_function(x)).collect()); + } + + retry_count += 1; + if retry_count > ID_RETRY_COUNT { + return Err(DatabaseError::RandomId); + } + } + } + }; +} + generate_ids!( pub generate_project_id, ProjectId, @@ -121,6 +152,13 @@ generate_ids!( NotificationId ); +generate_bulk_ids!( + pub generate_many_notification_ids, + NotificationId, + "SELECT EXISTS(SELECT 1 FROM notifications WHERE id = ANY($1))", + NotificationId +); + generate_ids!( pub generate_thread_id, ThreadId, diff --git a/src/database/models/notification_item.rs b/src/database/models/notification_item.rs index 206c5373..49d2fe1f 100644 --- a/src/database/models/notification_item.rs +++ b/src/database/models/notification_item.rs @@ -3,7 +3,6 @@ use crate::database::{models::DatabaseError, redis::RedisPool}; use crate::models::notifications::NotificationBody; use chrono::{DateTime, Utc}; use futures::TryStreamExt; -use itertools::Itertools; use serde::{Deserialize, Serialize}; const USER_NOTIFICATIONS_NAMESPACE: &str = "user_notifications"; @@ -46,37 +45,15 @@ impl NotificationBuilder { transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>, redis: &RedisPool, ) -> Result<(), DatabaseError> { - let mut notifications = Vec::new(); - for user in users { - let id = generate_notification_id(&mut *transaction).await?; + let notification_ids = + generate_many_notification_ids(users.len(), &mut *transaction).await?; - notifications.push(Notification { - id, - user_id: user, - body: self.body.clone(), - read: false, - created: Utc::now(), - }); - } - - Notification::insert_many(¬ifications, transaction, redis).await?; - - Ok(()) - } -} - -impl Notification { - pub async fn insert_many( - notifications: &[Notification], - transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>, - redis: &RedisPool, - ) -> Result<(), DatabaseError> { - let notification_ids = notifications.iter().map(|n| n.id.0).collect_vec(); - let user_ids = notifications.iter().map(|n| n.user_id.0).collect_vec(); - let bodies = notifications + let body = serde_json::value::to_value(&self.body)?; + let bodies = notification_ids .iter() - .map(|n| Ok(serde_json::value::to_value(n.body.clone())?)) - .collect::, DatabaseError>>()?; + .map(|_| body.clone()) + .collect::>(); + sqlx::query!( " INSERT INTO notifications ( @@ -84,22 +61,23 @@ impl Notification { ) SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::jsonb[]) ", - ¬ification_ids[..], - &user_ids[..], + ¬ification_ids + .into_iter() + .map(|x| x.0) + .collect::>()[..], + &users.iter().map(|x| x.0).collect::>()[..], &bodies[..], ) .execute(&mut **transaction) .await?; - Notification::clear_user_notifications_cache( - notifications.iter().map(|n| &n.user_id), - redis, - ) - .await?; + Notification::clear_user_notifications_cache(&users, redis).await?; Ok(()) } +} +impl Notification { pub async fn get<'a, 'b, E>( id: NotificationId, executor: E, diff --git a/src/models/v3/ids.rs b/src/models/v3/ids.rs index 73d0c32c..d2a6672d 100644 --- a/src/models/v3/ids.rs +++ b/src/models/v3/ids.rs @@ -38,11 +38,15 @@ pub fn random_base62(n: usize) -> u64 { /// This method panics if `n` is 0 or greater than 11, since a `u64` /// can only represent up to 11 character base62 strings pub fn random_base62_rng(rng: &mut R, n: usize) -> u64 { + random_base62_rng_range(rng, n, n) +} + +pub fn random_base62_rng_range(rng: &mut R, n_min: usize, n_max: usize) -> u64 { use rand::Rng; - assert!(n > 0 && n <= 11); + assert!(n_min > 0 && n_max <= 11 && n_min <= n_max); // gen_range is [low, high): max value is `MULTIPLES[n] - 1`, // which is n characters long when encoded - rng.gen_range(MULTIPLES[n - 1]..MULTIPLES[n]) + rng.gen_range(MULTIPLES[n_min - 1]..MULTIPLES[n_max]) } const MULTIPLES: [u64; 12] = [ diff --git a/src/ratelimit/memory.rs b/src/ratelimit/memory.rs index ee52ca6a..14280167 100644 --- a/src/ratelimit/memory.rs +++ b/src/ratelimit/memory.rs @@ -141,103 +141,3 @@ impl Handler for MemoryStoreActor { } } } - -#[cfg(test)] -mod tests { - use super::*; - - #[actix_rt::test] - async fn test_set() { - let store = MemoryStore::new(); - let addr = MemoryStoreActor::from(store.clone()).start(); - let res = addr - .send(ActorMessage::Set { - key: "hello".to_string(), - value: 30usize, - expiry: Duration::from_secs(5), - }) - .await; - let res = res.expect("Failed to send msg"); - match res { - ActorResponse::Set(c) => match c.await { - Ok(()) => {} - Err(e) => panic!("Shouldn't happen {}", &e), - }, - _ => panic!("Shouldn't happen!"), - } - } - - #[actix_rt::test] - async fn test_get() { - let store = MemoryStore::new(); - let addr = MemoryStoreActor::from(store.clone()).start(); - let expiry = Duration::from_secs(5); - let res = addr - .send(ActorMessage::Set { - key: "hello".to_string(), - value: 30usize, - expiry, - }) - .await; - let res = res.expect("Failed to send msg"); - match res { - ActorResponse::Set(c) => match c.await { - Ok(()) => {} - Err(e) => panic!("Shouldn't happen {}", &e), - }, - _ => panic!("Shouldn't happen!"), - } - let res2 = addr.send(ActorMessage::Get("hello".to_string())).await; - let res2 = res2.expect("Failed to send msg"); - match res2 { - ActorResponse::Get(c) => match c.await { - Ok(d) => { - let d = d.unwrap(); - assert_eq!(d, 30usize); - } - Err(e) => panic!("Shouldn't happen {}", &e), - }, - _ => panic!("Shouldn't happen!"), - }; - } - - #[actix_rt::test] - async fn test_expiry() { - let store = MemoryStore::new(); - let addr = MemoryStoreActor::from(store.clone()).start(); - let expiry = Duration::from_secs(3); - let res = addr - .send(ActorMessage::Set { - key: "hello".to_string(), - value: 30usize, - expiry, - }) - .await; - let res = res.expect("Failed to send msg"); - match res { - ActorResponse::Set(c) => match c.await { - Ok(()) => {} - Err(e) => panic!("Shouldn't happen {}", &e), - }, - _ => panic!("Shouldn't happen!"), - } - assert!(addr.connected()); - - let res3 = addr.send(ActorMessage::Expire("hello".to_string())).await; - let res3 = res3.expect("Failed to send msg"); - match res3 { - ActorResponse::Expire(c) => match c.await { - Ok(dur) => { - let now = Duration::from_secs(3); - if dur > now || dur > now + Duration::from_secs(4) { - panic!("Expiry is invalid!"); - } - } - Err(e) => { - panic!("Shouldn't happen: {}", &e); - } - }, - _ => panic!("Shouldn't happen!"), - }; - } -}