Files
AstralRinth/apps/labrinth/src/routes/v3/threads.rs
aecsocket 39f2b0ecb6 Technical review queue (#4775)
* chore: fix typo in status message

* feat(labrinth): overhaul malware scanner report storage and routes

* chore: address some review comments

* feat: add Delphi to Docker Compose `with-delphi` profile

* chore: fix unused import Clippy lint

* feat(labrinth/delphi): use PAT token authorization with project read scopes

* chore: expose file IDs in version queries

* fix: accept null decompiled source payloads from Delphi

* tweak(labrinth): expose base62 file IDs more consistently for Delphi

* feat(labrinth/delphi): support new Delphi report severity field

* chore(labrinth): run `cargo sqlx prepare` to fix Docker build errors

* tweak: add route for fetching Delphi issue type schema, abstract Labrinth away from issue types

* chore: run `cargo sqlx prepare`

* chore: fix typo on frontend generated state file message

* feat: update to use new Delphi issue schema

* wip: tech review endpoints

* wip: add ToSchema for dependent types

* wip: report issues return

* wip

* wip: returning more data

* wip

* Fix up db query

* Delphi configuration to talk to Labrinth

* Get Delphi working with Labrinth

* Add Delphi dummy fixture

* Better Delphi logging

* Improve utoipa for tech review routes

* Add more sorting options for tech review queue

* Oops join

* New routes for fetching issues and reports

* Fix which kind of ID is returned in tech review endpoints

* Deduplicate tech review report rows

* Reduce info sent for projects

* Fetch more thread info

* Address PR comments

* fix ci

* fix postgres version mismatch

* fix version creation

* Implement routes

* fix up tech review

* Allow adding a moderation comment to Delphi rejections

* fix up rebase

* exclude rejected projects from tech review

* add status change msg to tech review thread

* cargo sqlx prepare

* also ignore withheld projects

* More filtering on issue search

* wip: report routes

* Fix up for build

* cargo sqlx prepare

* fix thread message privacy

* New tech review search route

* submit route

* details have statuses now

* add default to drid status

* dedup issue details

* fix sqlx query on empty files

* fixes

* Dedupe issue detail statuses and message on entering tech rev

* Fix qa issues

* Fix qa issues

* fix review comments

* typos

* fix ci

* feat: tech review frontend (#4781)

* chore: fix typo in status message

* feat(labrinth): overhaul malware scanner report storage and routes

* chore: address some review comments

* feat: add Delphi to Docker Compose `with-delphi` profile

* chore: fix unused import Clippy lint

* feat(labrinth/delphi): use PAT token authorization with project read scopes

* chore: expose file IDs in version queries

* fix: accept null decompiled source payloads from Delphi

* tweak(labrinth): expose base62 file IDs more consistently for Delphi

* feat(labrinth/delphi): support new Delphi report severity field

* chore(labrinth): run `cargo sqlx prepare` to fix Docker build errors

* tweak: add route for fetching Delphi issue type schema, abstract Labrinth away from issue types

* chore: run `cargo sqlx prepare`

* chore: fix typo on frontend generated state file message

* feat: update to use new Delphi issue schema

* wip: tech review endpoints

* wip: add ToSchema for dependent types

* wip: report issues return

* wip

* wip: returning more data

* wip

* Fix up db query

* Delphi configuration to talk to Labrinth

* Get Delphi working with Labrinth

* Add Delphi dummy fixture

* Better Delphi logging

* Improve utoipa for tech review routes

* Add more sorting options for tech review queue

* Oops join

* New routes for fetching issues and reports

* Fix which kind of ID is returned in tech review endpoints

* Deduplicate tech review report rows

* Reduce info sent for projects

* Fetch more thread info

* Address PR comments

* fix ci

* fix ci

* fix postgres version mismatch

* fix version creation

* Implement routes

* feat: batch scan alert

* feat: layout

* feat: introduce surface variables

* fix: theme selector

* feat: rough draft of tech review card

* feat: tab switcher

* feat: batch scan btn

* feat: api-client module for tech review

* draft: impl

* feat: auto icons

* fix: layout issues

* feat: fixes to code blocks + flag labels

* feat: temp remove mock data

* fix: search sort types

* fix: intl & lint

* chore: re-enable mock data

* fix: flag badges + auto open first issue in file tab

* feat: update for new routes

* fix: more qa issues

* feat: lazy load sources

* fix: re-enable auth middleware

* feat: impl threads

* fix: lint & severity

* feat: download btn + switch to using NavTabs with new local mode option

* feat: re-add toplevel btns

* feat: reports page consistency

* fix: consistency on project queue

* fix: icons + sizing

* fix: colors and gaps

* fix: impl endpoints

* feat: load all flags on file tab

* feat: thread generics changes

* feat: more qa

* feat: fix collapse

* fix: qa

* feat: msg modal

* fix: ISO import

* feat: qa fixes

* fix: empty state basic

* fix: collapsible region

* fix: collapse thread by default

* feat: rough draft of new process/flow

* fix labrinth build

* fix thread message privacy

* New tech review search route

* feat: qa fixes

* feat: QA changes

* fix: verdict on detail not whole issue

* fix: lint + intl

* fix: lint

* fix: thread message for tech rev verdict

* feat: use anim frames

* fix: exports + typecheck

* polish: qa changes

* feat: qa

* feat: qa polish

* feat: fix malic modal

* fix: lint

* fix: qa + lint

* fix: pagination

* fix: lint

* fix: qa

* intl extract

* fix ci

---------

Signed-off-by: Calum H. <contact@cal.engineer>
Co-authored-by: Alejandro González <me@alegon.dev>
Co-authored-by: aecsocket <aecsocket@tutanota.com>

---------

Signed-off-by: Calum H. <contact@cal.engineer>
Co-authored-by: Alejandro González <me@alegon.dev>
Co-authored-by: Calum H. <contact@cal.engineer>
2025-12-20 11:43:04 +00:00

663 lines
20 KiB
Rust

use std::sync::Arc;
use crate::auth::get_user_from_headers;
use crate::database;
use crate::database::models::image_item;
use crate::database::models::notification_item::NotificationBuilder;
use crate::database::models::thread_item::ThreadMessageBuilder;
use crate::database::redis::RedisPool;
use crate::file_hosting::{FileHost, FileHostPublicity};
use crate::models::ids::{ThreadId, ThreadMessageId};
use crate::models::images::{Image, ImageContext};
use crate::models::notifications::NotificationBody;
use crate::models::pats::Scopes;
use crate::models::projects::ProjectStatus;
use crate::models::threads::{MessageBody, Thread, ThreadType};
use crate::models::users::User;
use crate::queue::session::AuthQueue;
use crate::routes::ApiError;
use actix_web::{HttpRequest, HttpResponse, web};
use futures::TryStreamExt;
use serde::Deserialize;
use sqlx::PgPool;
pub fn config(cfg: &mut web::ServiceConfig) {
cfg.service(
web::scope("thread")
.route("{id}", web::get().to(thread_get))
.route("{id}", web::post().to(thread_send_message)),
);
cfg.service(
web::scope("message").route("{id}", web::delete().to(message_delete)),
);
cfg.route("threads", web::get().to(threads_get));
}
pub async fn is_authorized_thread(
thread: &database::models::DBThread,
user: &User,
pool: &PgPool,
) -> Result<bool, ApiError> {
if user.role.is_mod() {
return Ok(true);
}
let user_id: database::models::DBUserId = user.id.into();
Ok(match thread.type_ {
ThreadType::Report => {
if let Some(report_id) = thread.report_id {
let report_exists = sqlx::query!(
"SELECT EXISTS(SELECT 1 FROM reports WHERE id = $1 AND reporter = $2)",
report_id as database::models::ids::DBReportId,
user_id as database::models::ids::DBUserId,
)
.fetch_one(pool)
.await?
.exists;
report_exists.unwrap_or(false)
} else {
false
}
}
ThreadType::Project => {
if let Some(project_id) = thread.project_id {
let project_exists = sqlx::query!(
"SELECT EXISTS(SELECT 1 FROM mods m INNER JOIN team_members tm ON tm.team_id = m.team_id AND tm.user_id = $2 WHERE m.id = $1)",
project_id as database::models::ids::DBProjectId,
user_id as database::models::ids::DBUserId,
)
.fetch_one(pool)
.await?
.exists;
if !project_exists.unwrap_or(false) {
let org_exists = sqlx::query!(
"SELECT EXISTS(SELECT 1 FROM mods m INNER JOIN organizations o ON m.organization_id = o.id INNER JOIN team_members tm ON tm.team_id = o.team_id AND tm.user_id = $2 WHERE m.id = $1)",
project_id as database::models::ids::DBProjectId,
user_id as database::models::ids::DBUserId,
)
.fetch_one(pool)
.await?
.exists;
org_exists.unwrap_or(false)
} else {
true
}
} else {
false
}
}
ThreadType::DirectMessage => thread.members.contains(&user_id),
})
}
pub async fn filter_authorized_threads(
threads: Vec<database::models::DBThread>,
user: &User,
pool: &web::Data<PgPool>,
redis: &RedisPool,
) -> Result<Vec<Thread>, ApiError> {
let user_id: database::models::DBUserId = user.id.into();
let mut return_threads = Vec::new();
let mut check_threads = Vec::new();
for thread in threads {
if user.role.is_mod()
|| (thread.type_ == ThreadType::DirectMessage
&& thread.members.contains(&user_id))
{
return_threads.push(thread);
} else {
check_threads.push(thread);
}
}
if !check_threads.is_empty() {
let project_thread_ids = check_threads
.iter()
.filter(|x| x.type_ == ThreadType::Project)
.filter_map(|x| x.project_id.map(|x| x.0))
.collect::<Vec<_>>();
if !project_thread_ids.is_empty() {
sqlx::query!(
"
SELECT m.id FROM mods m
INNER JOIN team_members tm ON tm.team_id = m.team_id AND user_id = $2
WHERE m.id = ANY($1)
",
&*project_thread_ids,
user_id as database::models::ids::DBUserId,
)
.fetch(&***pool)
.map_ok(|row| {
check_threads.retain(|x| {
let bool = x.project_id.map(|x| x.0) == Some(row.id);
if bool {
return_threads.push(x.clone());
}
!bool
});
})
.try_collect::<Vec<()>>()
.await?;
}
let mut org_project_thread_ids = check_threads
.iter()
.filter(|x| x.type_ == ThreadType::Project)
.filter_map(|x| x.project_id.map(|x| x.0));
if org_project_thread_ids.next().is_some() {
sqlx::query!(
"
SELECT m.id FROM mods m
INNER JOIN organizations o ON o.id = m.organization_id
INNER JOIN team_members tm ON tm.team_id = o.team_id AND user_id = $2
WHERE m.id = ANY($1)
",
&*project_thread_ids,
user_id as database::models::ids::DBUserId,
)
.fetch(&***pool)
.map_ok(|row| {
check_threads.retain(|x| {
let bool = x.project_id.map(|x| x.0) == Some(row.id);
if bool {
return_threads.push(x.clone());
}
!bool
});
})
.try_collect::<Vec<()>>()
.await?;
}
let report_thread_ids = check_threads
.iter()
.filter(|x| x.type_ == ThreadType::Report)
.filter_map(|x| x.report_id.map(|x| x.0))
.collect::<Vec<_>>();
if !report_thread_ids.is_empty() {
sqlx::query!(
"
SELECT id FROM reports
WHERE id = ANY($1) AND reporter = $2
",
&*report_thread_ids,
user_id as database::models::ids::DBUserId,
)
.fetch(&***pool)
.map_ok(|row| {
check_threads.retain(|x| {
let bool = x.report_id.map(|x| x.0) == Some(row.id);
if bool {
return_threads.push(x.clone());
}
!bool
});
})
.try_collect::<Vec<()>>()
.await?;
}
}
let mut user_ids = return_threads
.iter()
.flat_map(|x| x.members.clone())
.collect::<Vec<database::models::DBUserId>>();
user_ids.append(
&mut return_threads
.iter()
.flat_map(|x| {
x.messages
.iter()
.filter_map(|x| x.author_id)
.collect::<Vec<_>>()
})
.collect::<Vec<database::models::DBUserId>>(),
);
let users: Vec<User> =
database::models::DBUser::get_many_ids(&user_ids, &***pool, redis)
.await?
.into_iter()
.map(From::from)
.collect();
let mut final_threads = Vec::new();
for thread in return_threads {
let mut authors = thread.members.clone();
authors.append(
&mut thread
.messages
.iter()
.filter_map(|x| {
if x.hide_identity && !user.role.is_mod() {
None
} else {
x.author_id
}
})
.collect::<Vec<_>>(),
);
final_threads.push(Thread::from(
thread,
users
.iter()
.filter(|x| authors.contains(&x.id.into()))
.cloned()
.collect(),
user,
));
}
Ok(final_threads)
}
pub async fn thread_get(
req: HttpRequest,
info: web::Path<(ThreadId,)>,
pool: web::Data<PgPool>,
redis: web::Data<RedisPool>,
session_queue: web::Data<AuthQueue>,
) -> Result<HttpResponse, ApiError> {
let string = info.into_inner().0.into();
let thread_data = database::models::DBThread::get(string, &**pool).await?;
let user = get_user_from_headers(
&req,
&**pool,
&redis,
&session_queue,
Scopes::THREAD_READ,
)
.await?
.1;
if let Some(mut data) = thread_data
&& is_authorized_thread(&data, &user, &pool).await?
{
let authors = &mut data.members;
authors.append(
&mut data
.messages
.iter()
.filter_map(|x| {
if x.hide_identity && !user.role.is_mod() {
None
} else {
x.author_id
}
})
.collect::<Vec<_>>(),
);
let users: Vec<User> =
database::models::DBUser::get_many_ids(authors, &**pool, &redis)
.await?
.into_iter()
.map(From::from)
.collect();
return Ok(HttpResponse::Ok().json(Thread::from(data, users, &user)));
}
Err(ApiError::NotFound)
}
#[derive(Deserialize)]
pub struct ThreadIds {
pub ids: String,
}
pub async fn threads_get(
req: HttpRequest,
web::Query(ids): web::Query<ThreadIds>,
pool: web::Data<PgPool>,
redis: web::Data<RedisPool>,
session_queue: web::Data<AuthQueue>,
) -> Result<HttpResponse, ApiError> {
let user = get_user_from_headers(
&req,
&**pool,
&redis,
&session_queue,
Scopes::THREAD_READ,
)
.await?
.1;
let thread_ids: Vec<database::models::ids::DBThreadId> =
serde_json::from_str::<Vec<ThreadId>>(&ids.ids)?
.into_iter()
.map(|x| x.into())
.collect();
let threads_data =
database::models::DBThread::get_many(&thread_ids, &**pool).await?;
let threads =
filter_authorized_threads(threads_data, &user, &pool, &redis).await?;
Ok(HttpResponse::Ok().json(threads))
}
#[derive(Deserialize)]
pub struct NewThreadMessage {
pub body: MessageBody,
}
pub async fn thread_send_message(
req: HttpRequest,
info: web::Path<(ThreadId,)>,
pool: web::Data<PgPool>,
new_message: web::Json<NewThreadMessage>,
redis: web::Data<RedisPool>,
session_queue: web::Data<AuthQueue>,
) -> Result<HttpResponse, ApiError> {
let user = get_user_from_headers(
&req,
&**pool,
&redis,
&session_queue,
Scopes::THREAD_WRITE,
)
.await?
.1;
thread_send_message_internal(
&user,
info.into_inner().0,
&pool,
new_message.into_inner(),
&redis,
)
.await?;
Ok(HttpResponse::NoContent().finish())
}
pub async fn thread_send_message_internal(
user: &User,
thread_id: ThreadId,
pool: &PgPool,
new_message: NewThreadMessage,
redis: &RedisPool,
) -> Result<(), ApiError> {
let string: database::models::DBThreadId = thread_id.into();
let is_private: bool;
if let MessageBody::Text {
body,
replying_to,
private,
..
} = &new_message.body
{
if body.len() > 65536 {
return Err(ApiError::InvalidInput(
"Input body is too long!".to_string(),
));
}
if *private && !user.role.is_mod() {
return Err(ApiError::InvalidInput(
"You are not allowed to send private messages!".to_string(),
));
}
if let Some(replying_to) = replying_to {
let thread_message = database::models::DBThreadMessage::get(
(*replying_to).into(),
pool,
)
.await?;
if let Some(thread_message) = thread_message {
if thread_message.thread_id != string {
return Err(ApiError::InvalidInput(
"Message replied to is from another thread!"
.to_string(),
));
}
} else {
return Err(ApiError::InvalidInput(
"Message replied to does not exist!".to_string(),
));
}
}
is_private = *private;
} else {
return Err(ApiError::InvalidInput(
"You may only send text messages through this route!".to_string(),
));
}
let result = database::models::DBThread::get(string, pool).await?;
if let Some(thread) = result {
if !is_authorized_thread(&thread, user, pool).await? {
return Err(ApiError::NotFound);
}
let mut transaction = pool.begin().await?;
let id = ThreadMessageBuilder {
author_id: Some(user.id.into()),
body: new_message.body.clone(),
thread_id: thread.id,
hide_identity: user.role.is_mod(),
}
.insert(&mut transaction)
.await?;
if let Some(project_id) = thread.project_id {
let project =
database::models::DBProject::get_id(project_id, pool, redis)
.await?;
if let Some(project) = project
&& project.inner.status != ProjectStatus::Processing
&& user.role.is_mod()
&& !is_private
{
let members =
database::models::DBTeamMember::get_from_team_full(
project.inner.team_id,
pool,
redis,
)
.await?;
NotificationBuilder {
body: NotificationBody::ModeratorMessage {
thread_id: thread.id.into(),
message_id: id.into(),
project_id: Some(project.inner.id.into()),
report_id: None,
},
}
.insert_many(
members.iter().map(|x| x.user_id).collect(),
&mut transaction,
redis,
)
.await?;
NotificationBuilder {
body: NotificationBody::ModerationMessageReceived {
project_id: project.inner.id.into(),
},
}
.insert_many(
members.iter().map(|x| x.user_id).collect(),
&mut transaction,
redis,
)
.await?;
}
} else if let Some(report_id) = thread.report_id {
let report =
database::models::report_item::DBReport::get(report_id, pool)
.await?;
if let Some(report) = report {
if report.closed && !user.role.is_mod() {
return Err(ApiError::InvalidInput(
"You may not reply to a closed report".to_string(),
));
}
if user.id != report.reporter.into() && !is_private {
NotificationBuilder {
body: NotificationBody::ModeratorMessage {
thread_id: thread.id.into(),
message_id: id.into(),
project_id: None,
report_id: Some(report.id.into()),
},
}
.insert(report.reporter, &mut transaction, redis)
.await?;
}
}
}
if let MessageBody::Text {
associated_images, ..
} = &new_message.body
{
for image_id in associated_images {
if let Some(db_image) = image_item::DBImage::get(
(*image_id).into(),
&mut *transaction,
redis,
)
.await?
{
let image: Image = db_image.into();
if !matches!(
image.context,
ImageContext::ThreadMessage { .. }
) || image.context.inner_id().is_some()
{
return Err(ApiError::InvalidInput(format!(
"Image {image_id} is not unused and in the 'thread_message' context"
)));
}
sqlx::query!(
"
UPDATE uploaded_images
SET thread_message_id = $1
WHERE id = $2
",
thread.id.0,
image_id.0 as i64
)
.execute(&mut *transaction)
.await?;
image_item::DBImage::clear_cache(image.id.into(), redis)
.await?;
} else {
return Err(ApiError::InvalidInput(format!(
"Image {image_id} does not exist"
)));
}
}
}
transaction.commit().await?;
Ok(())
} else {
Err(ApiError::NotFound)
}
}
pub async fn message_delete(
req: HttpRequest,
info: web::Path<(ThreadMessageId,)>,
pool: web::Data<PgPool>,
redis: web::Data<RedisPool>,
session_queue: web::Data<AuthQueue>,
file_host: web::Data<Arc<dyn FileHost + Send + Sync>>,
) -> Result<HttpResponse, ApiError> {
let user = get_user_from_headers(
&req,
&**pool,
&redis,
&session_queue,
Scopes::THREAD_WRITE,
)
.await?
.1;
let result = database::models::DBThreadMessage::get(
info.into_inner().0.into(),
&**pool,
)
.await?;
if let Some(thread) = result {
if !user.role.is_mod() && thread.author_id != Some(user.id.into()) {
return Err(ApiError::CustomAuthentication(
"You cannot delete this message!".to_string(),
));
}
let mut transaction = pool.begin().await?;
let context = ImageContext::ThreadMessage {
thread_message_id: Some(thread.id.into()),
};
let images =
database::DBImage::get_many_contexted(context, &mut transaction)
.await?;
let cdn_url = dotenvy::var("CDN_URL")?;
for image in images {
let name = image.url.split(&format!("{cdn_url}/")).nth(1);
if let Some(icon_path) = name {
file_host
.delete_file(
icon_path,
FileHostPublicity::Public, // FIXME: Consider using private file storage?
)
.await?;
}
database::DBImage::remove(image.id, &mut transaction, &redis)
.await?;
}
let private = thread.body.is_private();
database::models::DBThreadMessage::remove_full(
thread.id,
private,
&mut transaction,
)
.await?;
transaction.commit().await?;
Ok(HttpResponse::NoContent().body(""))
} else {
Err(ApiError::NotFound)
}
}