Housekeeping + Fix DB perf issues (#542)

* Housekeeping + fix db perf issues

* run prep
This commit is contained in:
Geometrically
2023-02-22 16:11:14 -07:00
committed by GitHub
parent 9afdc55416
commit 00d09aa01e
21 changed files with 1251 additions and 674 deletions

View File

@@ -108,11 +108,13 @@ pub async fn process_payout(
pool: web::Data<PgPool>,
data: web::Json<PayoutData>,
) -> Result<HttpResponse, ApiError> {
let start = data
.date
.date()
.and_hms_nano(0, 0, 0, 0)
.with_timezone(&Utc);
let start: DateTime<Utc> = DateTime::from_utc(
data.date
.date_naive()
.and_hms_nano_opt(0, 0, 0, 0)
.unwrap_or_default(),
Utc,
);
let client = reqwest::Client::new();
let mut transaction = pool.begin().await?;

View File

@@ -267,7 +267,8 @@ pub async fn handle_stripe_webhook(
if let Some(item) = invoice.lines.data.first() {
let expires: DateTime<Utc> = DateTime::from_utc(
NaiveDateTime::from_timestamp(item.period.end, 0),
NaiveDateTime::from_timestamp_opt(item.period.end, 0)
.unwrap_or_default(),
Utc,
) + Duration::days(1);

View File

@@ -9,15 +9,18 @@ use crate::models::projects::{
use crate::models::teams::Permissions;
use crate::routes::ApiError;
use crate::search::{search_for_project, SearchConfig, SearchError};
use crate::util::auth::{get_user_from_headers, is_authorized};
use crate::util::auth::{
filter_authorized_projects, get_user_from_headers, is_authorized,
};
use crate::util::routes::read_from_payload;
use crate::util::validate::validation_errors_to_string;
use actix_web::{delete, get, patch, post, web, HttpRequest, HttpResponse};
use chrono::{DateTime, Utc};
use futures::{StreamExt, TryStreamExt};
use futures::TryStreamExt;
use meilisearch_sdk::indexes::IndexesResults;
use serde::{Deserialize, Serialize};
use serde_json::json;
use sqlx::{PgPool, Row};
use sqlx::PgPool;
use std::sync::Arc;
use validator::Validate;
@@ -91,16 +94,8 @@ pub async fn projects_get(
let user_option = get_user_from_headers(req.headers(), &**pool).await.ok();
let projects: Vec<_> = futures::stream::iter(projects_data)
.filter_map(|data| async {
if is_authorized(&data.inner, &user_option, &pool).await.ok()? {
Some(Project::from(data))
} else {
None
}
})
.collect()
.await;
let projects =
filter_authorized_projects(projects_data, &user_option, &pool).await?;
Ok(HttpResponse::Ok().json(projects))
}
@@ -216,26 +211,25 @@ pub async fn dependency_list(
use futures::stream::TryStreamExt;
//TODO: This query is not checked at compile time! Once SQLX parses this query correctly, please use the query! macro instead
let dependencies = sqlx::query(
let dependencies = sqlx::query!(
"
SELECT d.dependency_id, vd.mod_id, d.mod_dependency_id
SELECT d.dependency_id, COALESCE(vd.mod_id, 0) mod_id, d.mod_dependency_id
FROM versions v
INNER JOIN dependencies d ON d.dependent_id = v.id
LEFT JOIN versions vd ON d.dependency_id = vd.id
WHERE v.mod_id = $1
",
id as database::models::ProjectId
)
.bind(id as database::models::ProjectId)
.fetch_many(&**pool)
.try_filter_map(|e| async {
Ok(e.right().map(|x| {
(
x.get::<Option<i64>, usize>(0)
x.dependency_id
.map(database::models::VersionId),
x.get::<Option<i64>, usize>(1)
.map(database::models::ProjectId),
x.get::<Option<i64>, usize>(2)
if x.mod_id == Some(0) { None } else { x.mod_id
.map(database::models::ProjectId) },
x.mod_dependency_id
.map(database::models::ProjectId),
)
}))
@@ -262,19 +256,20 @@ pub async fn dependency_list(
})
.collect::<Vec<_>>();
let (projects_result, versions_result) = futures::join!(
database::Project::get_many_full(&project_ids, &**pool,),
let (projects_result, versions_result) = futures::future::try_join(
database::Project::get_many_full(&project_ids, &**pool),
database::Version::get_many_full(
dependencies.iter().filter_map(|x| x.0).collect(),
&**pool,
)
);
),
)
.await?;
let mut projects = projects_result?
let mut projects = projects_result
.into_iter()
.map(models::projects::Project::from)
.collect::<Vec<_>>();
let mut versions = versions_result?
let mut versions = versions_result
.into_iter()
.map(models::projects::Version::from)
.collect::<Vec<_>>();
@@ -2372,9 +2367,9 @@ pub async fn delete_from_index(
let client =
meilisearch_sdk::client::Client::new(&*config.address, &*config.key);
let indexes: Vec<meilisearch_sdk::indexes::Index> =
client.get_indexes().await?;
for index in indexes {
let indexes: IndexesResults = client.get_indexes().await?;
for index in indexes.results {
index.delete_document(id.to_string()).await?;
}

View File

@@ -18,7 +18,8 @@ pub async fn get_stats(
.map(|x| x.to_string())
.collect::<Vec<String>>(),
)
.fetch_one(&**pool);
.fetch_one(&**pool)
.await?;
let versions = sqlx::query!(
"
@@ -36,7 +37,8 @@ pub async fn get_stats(
.map(|x| x.to_string())
.collect::<Vec<String>>(),
)
.fetch_one(&**pool);
.fetch_one(&**pool)
.await?;
let authors = sqlx::query!(
"
@@ -50,7 +52,8 @@ pub async fn get_stats(
.map(|x| x.to_string())
.collect::<Vec<String>>(),
)
.fetch_one(&**pool);
.fetch_one(&**pool)
.await?;
let files = sqlx::query!(
"
@@ -67,10 +70,8 @@ pub async fn get_stats(
.map(|x| x.to_string())
.collect::<Vec<String>>(),
)
.fetch_one(&**pool);
let (projects, versions, authors, files) =
futures::future::try_join4(projects, versions, authors, files).await?;
.fetch_one(&**pool)
.await?;
let json = json!({
"projects": projects.count,

View File

@@ -5,11 +5,10 @@ use serde::Serialize;
use sqlx::PgPool;
use crate::database;
use crate::models::projects::{Version, VersionType};
use crate::models::projects::VersionType;
use crate::util::auth::{
get_user_from_headers, is_authorized, is_authorized_version,
filter_authorized_versions, get_user_from_headers, is_authorized,
};
use futures::StreamExt;
use super::ApiError;
@@ -48,22 +47,10 @@ pub async fn forge_updates(
let versions =
database::models::Version::get_many_full(version_ids, &**pool).await?;
let mut versions = futures::stream::iter(versions)
.filter_map(|data| async {
if is_authorized_version(&data.inner, &user_option, &pool)
.await
.ok()?
{
Some(data)
} else {
None
}
})
.collect::<Vec<_>>()
.await;
let mut versions =
filter_authorized_versions(versions, &user_option, &pool).await?;
versions
.sort_by(|a, b| b.inner.date_published.cmp(&a.inner.date_published));
versions.sort_by(|a, b| b.date_published.cmp(&a.date_published));
#[derive(Serialize)]
struct ForgeUpdates {
@@ -81,8 +68,6 @@ pub async fn forge_updates(
};
for version in versions {
let version = Version::from(version);
if version.version_type == VersionType::Release {
for game_version in &version.game_versions {
response

View File

@@ -7,11 +7,11 @@ use crate::util::auth::get_user_from_headers;
use crate::util::routes::ok_or_not_found;
use crate::{database, models};
use actix_web::{delete, get, post, web, HttpRequest, HttpResponse};
use futures::TryStreamExt;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use std::collections::HashMap;
use tokio::sync::RwLock;
#[derive(Deserialize)]
pub struct HashQuery {
@@ -460,7 +460,7 @@ pub async fn update_files(
let result = sqlx::query!(
"
SELECT f.url url, h.hash hash, h.algorithm algorithm, f.version_id version_id, v.mod_id project_id FROM hashes h
SELECT h.hash, v.mod_id FROM hashes h
INNER JOIN files f ON h.file_id = f.id
INNER JOIN versions v ON v.id = f.version_id AND v.status != ANY($1)
INNER JOIN mods m on v.mod_id = m.id
@@ -471,49 +471,52 @@ pub async fn update_files(
update_data.algorithm,
&*crate::models::projects::ProjectStatus::iterator().filter(|x| x.is_hidden()).map(|x| x.to_string()).collect::<Vec<String>>(),
)
.fetch_all(&mut *transaction)
.fetch_many(&mut *transaction)
.try_filter_map(|e| async {
Ok(e.right().map(|m| (m.hash, database::models::ids::ProjectId(m.mod_id))))
})
.try_collect::<Vec<_>>()
.await?;
let version_ids: RwLock<HashMap<database::models::VersionId, Vec<u8>>> =
RwLock::new(HashMap::new());
let mut version_ids: HashMap<database::models::VersionId, Vec<u8>> =
HashMap::new();
futures::future::try_join_all(result.into_iter().map(|row| async {
let updated_versions = database::models::Version::get_project_versions(
database::models::ProjectId(row.project_id),
Some(
update_data
.game_versions
.clone()
.iter()
.map(|x| x.0.clone())
.collect(),
),
Some(
update_data
.loaders
.clone()
.iter()
.map(|x| x.0.clone())
.collect(),
),
None,
None,
None,
&**pool,
)
.await?;
if let Some(latest_version) = updated_versions.first() {
let mut version_ids = version_ids.write().await;
version_ids.insert(*latest_version, row.hash);
}
Ok::<(), ApiError>(())
}))
let updated_versions = database::models::Version::get_projects_versions(
result
.iter()
.map(|x| x.1)
.collect::<Vec<database::models::ProjectId>>()
.clone(),
Some(
update_data
.game_versions
.clone()
.iter()
.map(|x| x.0.clone())
.collect(),
),
Some(
update_data
.loaders
.clone()
.iter()
.map(|x| x.0.clone())
.collect(),
),
None,
None,
None,
&**pool,
)
.await?;
let version_ids = version_ids.into_inner();
for (hash, id) in result {
if let Some(latest_version) =
updated_versions.get(&id).and_then(|x| x.last())
{
version_ids.insert(*latest_version, hash);
}
}
let versions = database::models::Version::get_many_full(
version_ids.keys().copied().collect(),
@@ -533,8 +536,7 @@ pub async fn update_files(
models::projects::Version::from(version),
);
} else {
let version_id: models::projects::VersionId =
version.inner.id.into();
let version_id: VersionId = version.inner.id.into();
return Err(ApiError::Database(DatabaseError::Other(format!(
"Could not parse hash for version {version_id}"

View File

@@ -2,16 +2,16 @@ use super::ApiError;
use crate::database;
use crate::models;
use crate::models::projects::{
Dependency, FileType, Version, VersionStatus, VersionType,
Dependency, FileType, VersionStatus, VersionType,
};
use crate::models::teams::Permissions;
use crate::util::auth::{
get_user_from_headers, is_authorized, is_authorized_version,
filter_authorized_versions, get_user_from_headers, is_authorized,
is_authorized_version,
};
use crate::util::validate::validation_errors_to_string;
use actix_web::{delete, get, patch, post, web, HttpRequest, HttpResponse};
use chrono::{DateTime, Utc};
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use validator::Validate;
@@ -70,23 +70,16 @@ pub async fn version_list(
database::models::Version::get_many_full(version_ids, &**pool)
.await?;
let mut response = futures::stream::iter(versions.clone())
.filter_map(|data| async {
if is_authorized_version(&data.inner, &user_option, &pool)
.await
.ok()?
&& filters
.featured
.map(|featured| featured == data.inner.featured)
.unwrap_or(true)
{
Some(Version::from(data))
} else {
None
}
let mut response = versions
.iter()
.filter(|version| {
filters
.featured
.map(|featured| featured == version.inner.featured)
.unwrap_or(true)
})
.collect::<Vec<_>>()
.await;
.cloned()
.collect::<Vec<_>>();
versions.sort_by(|a, b| {
b.inner.date_published.cmp(&a.inner.date_published)
@@ -97,16 +90,15 @@ pub async fn version_list(
&& !versions.is_empty()
&& filters.featured.unwrap_or(false)
{
let (loaders, game_versions) = futures::join!(
let (loaders, game_versions) = futures::future::try_join(
database::models::categories::Loader::list(&**pool),
database::models::categories::GameVersion::list_filter(
None,
Some(true),
&**pool
)
);
let (loaders, game_versions) = (loaders?, game_versions?);
&**pool,
),
)
.await?;
let mut joined_filters = Vec::new();
for game_version in &game_versions {
@@ -122,21 +114,24 @@ pub async fn version_list(
version.game_versions.contains(&filter.0.version)
&& version.loaders.contains(&filter.1.loader)
})
.map(|version| {
response.push(Version::from(version.clone()))
})
.map(|version| response.push(version.clone()))
.unwrap_or(());
});
if response.is_empty() {
versions
.into_iter()
.for_each(|version| response.push(Version::from(version)));
.for_each(|version| response.push(version));
}
}
response.sort_by(|a, b| b.date_published.cmp(&a.date_published));
response.dedup_by(|a, b| a.id == b.id);
response.sort_by(|a, b| {
b.inner.date_published.cmp(&a.inner.date_published)
});
response.dedup_by(|a, b| a.inner.id == b.inner.id);
let response =
filter_authorized_versions(response, &user_option, &pool).await?;
Ok(HttpResponse::Ok().json(response))
} else {
@@ -190,19 +185,8 @@ pub async fn versions_get(
let user_option = get_user_from_headers(req.headers(), &**pool).await.ok();
let versions: Vec<_> = futures::stream::iter(versions_data)
.filter_map(|data| async {
if is_authorized_version(&data.inner, &user_option, &pool)
.await
.ok()?
{
Some(Version::from(data))
} else {
None
}
})
.collect()
.await;
let versions =
filter_authorized_versions(versions_data, &user_option, &pool).await?;
Ok(HttpResponse::Ok().json(versions))
}