Move download counting to worker (#306)

* Move download counting to worker

* Run `cargo sqlx prepare`

* Format & some Clippy fixes
This commit is contained in:
Danielle
2022-02-21 18:57:40 -08:00
committed by GitHub
parent 9492363b22
commit 3f671b918a
13 changed files with 126 additions and 275 deletions

View File

@@ -347,16 +347,6 @@ impl Version {
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
DELETE FROM downloads
WHERE downloads.version_id = $1
",
id as VersionId,
)
.execute(&mut *transaction)
.await?;
let files = sqlx::query!(
"
SELECT files.id, files.url, files.filename, files.is_primary FROM files

View File

@@ -65,6 +65,8 @@ async fn main() -> std::io::Result<()> {
}
}
info!("Starting Labrinth on {}", dotenv::var("BIND_ADDR").unwrap());
let search_config = search::SearchConfig {
address: dotenv::var("MEILISEARCH_ADDR").unwrap(),
key: dotenv::var("MEILISEARCH_KEY").unwrap(),
@@ -163,22 +165,6 @@ async fn main() -> std::io::Result<()> {
info!("Deleting old records from temporary tables");
async move {
let downloads_result = sqlx::query!(
"
DELETE FROM downloads
WHERE date < (CURRENT_DATE - INTERVAL '30 minutes ago')
"
)
.execute(&pool_ref)
.await;
if let Err(e) = downloads_result {
warn!(
"Deleting old records from temporary table downloads failed: {:?}",
e
);
}
let states_result = sqlx::query!(
"
DELETE FROM states
@@ -319,6 +305,7 @@ fn check_env_vars() -> bool {
failed |= check_var::<String>("SITE_URL");
failed |= check_var::<String>("CDN_URL");
failed |= check_var::<String>("LABRINTH_ADMIN_KEY");
failed |= check_var::<String>("DATABASE_URL");
failed |= check_var::<String>("MEILISEARCH_ADDR");
failed |= check_var::<String>("MEILISEARCH_KEY");

View File

@@ -87,6 +87,7 @@ pub fn versions_config(cfg: &mut web::ServiceConfig) {
web::scope("version")
.service(versions::version_get)
.service(versions::version_delete)
.service(versions::version_count_patch)
.service(version_creation::upload_file_to_version)
.service(versions::version_edit),
);

View File

@@ -10,7 +10,6 @@ use actix_web::{delete, get, web, HttpRequest, HttpResponse};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use std::borrow::Borrow;
use std::sync::Arc;
/// A specific version of a mod
@@ -230,11 +229,9 @@ pub struct DownloadRedirect {
#[allow(clippy::await_holding_refcell_ref)]
#[get("{version_id}/download")]
pub async fn download_version(
req: HttpRequest,
info: web::Path<(String,)>,
pool: web::Data<PgPool>,
algorithm: web::Query<Algorithm>,
pepper: web::Data<Pepper>,
) -> Result<HttpResponse, ApiError> {
let hash = info.into_inner().0;
@@ -253,64 +250,6 @@ pub async fn download_version(
.map_err(|e| ApiError::DatabaseError(e.into()))?;
if let Some(id) = result {
let real_ip = req.connection_info();
let ip_option = real_ip.borrow().peer_addr();
if let Some(ip) = ip_option {
let hash = sha1::Sha1::from(format!("{}{}", ip, pepper.pepper)).hexdigest();
let download_exists = sqlx::query!(
"SELECT EXISTS(SELECT 1 FROM downloads WHERE version_id = $1 AND date > (CURRENT_DATE - INTERVAL '30 minutes ago') AND identifier = $2)",
id.version_id,
hash,
)
.fetch_one(&**pool)
.await
.map_err(|e| ApiError::DatabaseError(e.into()))?
.exists.unwrap_or(false);
if !download_exists {
sqlx::query!(
"
INSERT INTO downloads (
version_id, identifier
)
VALUES (
$1, $2
)
",
id.version_id,
hash
)
.execute(&**pool)
.await
.map_err(|e| ApiError::DatabaseError(e.into()))?;
sqlx::query!(
"
UPDATE versions
SET downloads = downloads + 1
WHERE id = $1
",
id.version_id,
)
.execute(&**pool)
.await
.map_err(|e| ApiError::DatabaseError(e.into()))?;
sqlx::query!(
"
UPDATE mods
SET downloads = downloads + 1
WHERE id = $1
",
id.mod_id,
)
.execute(&**pool)
.await
.map_err(|e| ApiError::DatabaseError(e.into()))?;
}
}
Ok(HttpResponse::TemporaryRedirect()
.append_header(("Location", &*id.url))
.json(DownloadRedirect { url: id.url }))

View File

@@ -1,16 +1,14 @@
use super::ApiError;
use crate::database::models::version_item::QueryVersion;
use crate::file_hosting::FileHost;
use crate::models;
use crate::{models, database};
use crate::models::projects::{GameVersion, Loader, Version};
use crate::models::teams::Permissions;
use crate::util::auth::get_user_from_headers;
use crate::util::routes::ok_or_not_found;
use crate::{database, Pepper};
use actix_web::{delete, get, post, web, HttpRequest, HttpResponse};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use std::borrow::Borrow;
use std::collections::HashMap;
use std::sync::Arc;
@@ -70,11 +68,9 @@ pub struct DownloadRedirect {
// under /api/v1/version_file/{hash}/download
#[get("{version_id}/download")]
pub async fn download_version(
req: HttpRequest,
info: web::Path<(String,)>,
pool: web::Data<PgPool>,
algorithm: web::Query<Algorithm>,
pepper: web::Data<Pepper>,
) -> Result<HttpResponse, ApiError> {
let hash = info.into_inner().0.to_lowercase();
let mut transaction = pool.begin().await?;
@@ -93,15 +89,6 @@ pub async fn download_version(
.await?;
if let Some(id) = result {
download_version_inner(
database::models::VersionId(id.version_id),
database::models::ProjectId(id.project_id),
&req,
&mut transaction,
&pepper,
)
.await?;
transaction.commit().await?;
Ok(HttpResponse::TemporaryRedirect()
@@ -112,84 +99,6 @@ pub async fn download_version(
}
}
async fn download_version_inner(
version_id: database::models::VersionId,
project_id: database::models::ProjectId,
req: &HttpRequest,
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
pepper: &web::Data<Pepper>,
) -> Result<(), ApiError> {
let real_ip = req.connection_info();
let ip_option = if dotenv::var("CLOUDFLARE_INTEGRATION")
.ok()
.map(|i| i.parse().unwrap())
.unwrap_or(false)
{
if let Some(header) = req.headers().get("CF-Connecting-IP") {
header.to_str().ok()
} else {
real_ip.borrow().peer_addr()
}
} else {
real_ip.borrow().peer_addr()
};
if let Some(ip) = ip_option {
let hash = sha1::Sha1::from(format!("{}{}", ip, pepper.pepper)).hexdigest();
let download_exists = sqlx::query!(
"SELECT EXISTS(SELECT 1 FROM downloads WHERE version_id = $1 AND date > (CURRENT_DATE - INTERVAL '30 minutes ago') AND identifier = $2)",
version_id as database::models::VersionId,
hash,
)
.fetch_one(&mut *transaction)
.await
?
.exists.unwrap_or(false);
if !download_exists {
sqlx::query!(
"
INSERT INTO downloads (
version_id, identifier
)
VALUES (
$1, $2
)
",
version_id as database::models::VersionId,
hash
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
UPDATE versions
SET downloads = downloads + 1
WHERE id = $1
",
version_id as database::models::VersionId,
)
.execute(&mut *transaction)
.await?;
sqlx::query!(
"
UPDATE mods
SET downloads = downloads + 1
WHERE id = $1
",
project_id as database::models::ProjectId,
)
.execute(&mut *transaction)
.await?;
}
}
Ok(())
}
// under /api/v1/version_file/{hash}
#[delete("{version_id}")]
pub async fn delete_file(
@@ -431,10 +340,8 @@ pub async fn get_versions_from_hashes(
#[post("download")]
pub async fn download_files(
req: HttpRequest,
pool: web::Data<PgPool>,
file_data: web::Json<FileHashes>,
pepper: web::Data<Pepper>,
) -> Result<HttpResponse, ApiError> {
let hashes_parsed: Vec<Vec<u8>> = file_data
.hashes
@@ -457,19 +364,10 @@ pub async fn download_files(
.fetch_all(&mut *transaction)
.await?;
let mut response = HashMap::new();
for row in result {
download_version_inner(
database::models::VersionId(row.version_id),
database::models::ProjectId(row.project_id),
&req,
&mut transaction,
&pepper,
)
.await?;
response.insert(hex::encode(row.hash), row.url);
}
let response = result
.into_iter()
.map(|row| (hex::encode(row.hash), row.url))
.collect::<HashMap<String, String>>();
Ok(HttpResponse::Ok().json(response))
}

View File

@@ -4,6 +4,7 @@ use crate::models;
use crate::models::projects::{Dependency, Version};
use crate::models::teams::Permissions;
use crate::util::auth::get_user_from_headers;
use crate::util::guards::admin_key_guard;
use crate::util::validate::validation_errors_to_string;
use actix_web::{delete, get, patch, web, HttpRequest, HttpResponse};
use serde::{Deserialize, Serialize};
@@ -420,6 +421,49 @@ pub async fn version_edit(
}
}
// This is an internal route, cannot be used without key
#[patch("{version_id}/_count-download", guard = "admin_key_guard")]
pub async fn version_count_patch(
info: web::Path<(models::ids::VersionId,)>,
pool: web::Data<PgPool>,
) -> Result<HttpResponse, ApiError> {
let version = info.into_inner().0;
let version = database::models::ids::VersionId::from(version);
futures::future::try_join(
sqlx::query!(
"UPDATE versions
SET downloads = downloads + 1
WHERE (id = $1)",
version as database::models::ids::VersionId
)
.execute(pool.as_ref()),
async {
let project_id = sqlx::query!(
"SELECT mod_id FROM versions
WHERE (id = $1)",
version as database::models::ids::VersionId
)
.fetch_one(pool.as_ref())
.await?
.mod_id;
sqlx::query!(
"UPDATE mods
SET downloads = downloads + 1
WHERE (id = $1)",
project_id
)
.execute(pool.as_ref())
.await
},
)
.await
.map_err(ApiError::SqlxDatabaseError)?;
Ok(HttpResponse::Ok().body(""))
}
#[delete("{version_id}")]
pub async fn version_delete(
req: HttpRequest,

12
src/util/guards.rs Normal file
View File

@@ -0,0 +1,12 @@
use actix_web::guard::GuardContext;
pub const ADMIN_KEY_HEADER: &str = "Modrinth-Admin";
pub fn admin_key_guard(ctx: &GuardContext) -> bool {
let admin_key = std::env::var("LABRINTH_ADMIN_KEY")
.expect("No admin key provided, this should have been caught by check_env_vars");
ctx.head()
.headers()
.get(ADMIN_KEY_HEADER)
.map_or(false, |it| it.as_bytes() == admin_key.as_bytes())
}

View File

@@ -1,6 +1,7 @@
pub mod auth;
pub mod env;
pub mod ext;
pub mod guards;
pub mod routes;
pub mod validate;
pub mod webhook;

View File

@@ -37,7 +37,7 @@ pub enum SupportedGameVersions {
All,
PastDate(DateTime<Utc>),
Range(DateTime<Utc>, DateTime<Utc>),
Custom(Vec<GameVersion>),
#[allow(dead_code)] Custom(Vec<GameVersion>),
}
pub trait Validator: Sync {