You've already forked AstralRinth
forked from didirus/AstralRinth
move to monorepo dir
This commit is contained in:
258
apps/labrinth/src/queue/analytics.rs
Normal file
258
apps/labrinth/src/queue/analytics.rs
Normal file
@@ -0,0 +1,258 @@
|
||||
use crate::database::models::DatabaseError;
|
||||
use crate::database::redis::RedisPool;
|
||||
use crate::models::analytics::{Download, PageView, Playtime};
|
||||
use crate::routes::ApiError;
|
||||
use dashmap::{DashMap, DashSet};
|
||||
use redis::cmd;
|
||||
use sqlx::PgPool;
|
||||
use std::collections::HashMap;
|
||||
use std::net::Ipv6Addr;
|
||||
|
||||
const DOWNLOADS_NAMESPACE: &str = "downloads";
|
||||
const VIEWS_NAMESPACE: &str = "views";
|
||||
|
||||
pub struct AnalyticsQueue {
|
||||
views_queue: DashMap<(u64, u64), Vec<PageView>>,
|
||||
downloads_queue: DashMap<(u64, u64), Download>,
|
||||
playtime_queue: DashSet<Playtime>,
|
||||
}
|
||||
|
||||
impl Default for AnalyticsQueue {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
// Batches analytics data points + transactions every few minutes
|
||||
impl AnalyticsQueue {
|
||||
pub fn new() -> Self {
|
||||
AnalyticsQueue {
|
||||
views_queue: DashMap::with_capacity(1000),
|
||||
downloads_queue: DashMap::with_capacity(1000),
|
||||
playtime_queue: DashSet::with_capacity(1000),
|
||||
}
|
||||
}
|
||||
|
||||
fn strip_ip(ip: Ipv6Addr) -> u64 {
|
||||
if let Some(ip) = ip.to_ipv4_mapped() {
|
||||
let octets = ip.octets();
|
||||
u64::from_be_bytes([octets[0], octets[1], octets[2], octets[3], 0, 0, 0, 0])
|
||||
} else {
|
||||
let octets = ip.octets();
|
||||
u64::from_be_bytes([
|
||||
octets[0], octets[1], octets[2], octets[3], octets[4], octets[5], octets[6],
|
||||
octets[7],
|
||||
])
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_view(&self, page_view: PageView) {
|
||||
let ip_stripped = Self::strip_ip(page_view.ip);
|
||||
|
||||
self.views_queue
|
||||
.entry((ip_stripped, page_view.project_id))
|
||||
.or_default()
|
||||
.push(page_view);
|
||||
}
|
||||
pub fn add_download(&self, download: Download) {
|
||||
let ip_stripped = Self::strip_ip(download.ip);
|
||||
self.downloads_queue
|
||||
.insert((ip_stripped, download.project_id), download);
|
||||
}
|
||||
|
||||
pub fn add_playtime(&self, playtime: Playtime) {
|
||||
self.playtime_queue.insert(playtime);
|
||||
}
|
||||
|
||||
pub async fn index(
|
||||
&self,
|
||||
client: clickhouse::Client,
|
||||
redis: &RedisPool,
|
||||
pool: &PgPool,
|
||||
) -> Result<(), ApiError> {
|
||||
let views_queue = self.views_queue.clone();
|
||||
self.views_queue.clear();
|
||||
|
||||
let downloads_queue = self.downloads_queue.clone();
|
||||
self.downloads_queue.clear();
|
||||
|
||||
let playtime_queue = self.playtime_queue.clone();
|
||||
self.playtime_queue.clear();
|
||||
|
||||
if !playtime_queue.is_empty() {
|
||||
let mut playtimes = client.insert("playtime")?;
|
||||
|
||||
for playtime in playtime_queue {
|
||||
playtimes.write(&playtime).await?;
|
||||
}
|
||||
|
||||
playtimes.end().await?;
|
||||
}
|
||||
|
||||
if !views_queue.is_empty() {
|
||||
let mut views_keys = Vec::new();
|
||||
let mut raw_views = Vec::new();
|
||||
|
||||
for (key, views) in views_queue {
|
||||
views_keys.push(key);
|
||||
raw_views.push((views, true));
|
||||
}
|
||||
|
||||
let mut redis = redis.pool.get().await.map_err(DatabaseError::RedisPool)?;
|
||||
|
||||
let results = cmd("MGET")
|
||||
.arg(
|
||||
views_keys
|
||||
.iter()
|
||||
.map(|x| format!("{}:{}-{}", VIEWS_NAMESPACE, x.0, x.1))
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
.query_async::<_, Vec<Option<u32>>>(&mut redis)
|
||||
.await
|
||||
.map_err(DatabaseError::CacheError)?;
|
||||
|
||||
let mut pipe = redis::pipe();
|
||||
for (idx, count) in results.into_iter().enumerate() {
|
||||
let key = &views_keys[idx];
|
||||
|
||||
let new_count = if let Some((views, monetized)) = raw_views.get_mut(idx) {
|
||||
if let Some(count) = count {
|
||||
if count > 3 {
|
||||
*monetized = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (count + views.len() as u32) > 3 {
|
||||
*monetized = false;
|
||||
}
|
||||
|
||||
count + (views.len() as u32)
|
||||
} else {
|
||||
views.len() as u32
|
||||
}
|
||||
} else {
|
||||
1
|
||||
};
|
||||
|
||||
pipe.atomic().set_ex(
|
||||
format!("{}:{}-{}", VIEWS_NAMESPACE, key.0, key.1),
|
||||
new_count,
|
||||
6 * 60 * 60,
|
||||
);
|
||||
}
|
||||
pipe.query_async(&mut *redis)
|
||||
.await
|
||||
.map_err(DatabaseError::CacheError)?;
|
||||
|
||||
let mut views = client.insert("views")?;
|
||||
|
||||
for (all_views, monetized) in raw_views {
|
||||
for (idx, mut view) in all_views.into_iter().enumerate() {
|
||||
if idx != 0 || !monetized {
|
||||
view.monetized = false;
|
||||
}
|
||||
|
||||
views.write(&view).await?;
|
||||
}
|
||||
}
|
||||
|
||||
views.end().await?;
|
||||
}
|
||||
|
||||
if !downloads_queue.is_empty() {
|
||||
let mut downloads_keys = Vec::new();
|
||||
let raw_downloads = DashMap::new();
|
||||
|
||||
for (index, (key, download)) in downloads_queue.into_iter().enumerate() {
|
||||
downloads_keys.push(key);
|
||||
raw_downloads.insert(index, download);
|
||||
}
|
||||
|
||||
let mut redis = redis.pool.get().await.map_err(DatabaseError::RedisPool)?;
|
||||
|
||||
let results = cmd("MGET")
|
||||
.arg(
|
||||
downloads_keys
|
||||
.iter()
|
||||
.map(|x| format!("{}:{}-{}", DOWNLOADS_NAMESPACE, x.0, x.1))
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
.query_async::<_, Vec<Option<u32>>>(&mut redis)
|
||||
.await
|
||||
.map_err(DatabaseError::CacheError)?;
|
||||
|
||||
let mut pipe = redis::pipe();
|
||||
for (idx, count) in results.into_iter().enumerate() {
|
||||
let key = &downloads_keys[idx];
|
||||
|
||||
let new_count = if let Some(count) = count {
|
||||
if count > 5 {
|
||||
raw_downloads.remove(&idx);
|
||||
continue;
|
||||
}
|
||||
|
||||
count + 1
|
||||
} else {
|
||||
1
|
||||
};
|
||||
|
||||
pipe.atomic().set_ex(
|
||||
format!("{}:{}-{}", DOWNLOADS_NAMESPACE, key.0, key.1),
|
||||
new_count,
|
||||
6 * 60 * 60,
|
||||
);
|
||||
}
|
||||
pipe.query_async(&mut *redis)
|
||||
.await
|
||||
.map_err(DatabaseError::CacheError)?;
|
||||
|
||||
let mut transaction = pool.begin().await?;
|
||||
let mut downloads = client.insert("downloads")?;
|
||||
|
||||
let mut version_downloads: HashMap<i64, i32> = HashMap::new();
|
||||
let mut project_downloads: HashMap<i64, i32> = HashMap::new();
|
||||
|
||||
for (_, download) in raw_downloads {
|
||||
*version_downloads
|
||||
.entry(download.version_id as i64)
|
||||
.or_default() += 1;
|
||||
*project_downloads
|
||||
.entry(download.project_id as i64)
|
||||
.or_default() += 1;
|
||||
|
||||
downloads.write(&download).await?;
|
||||
}
|
||||
|
||||
sqlx::query(
|
||||
"
|
||||
UPDATE versions v
|
||||
SET downloads = v.downloads + x.amount
|
||||
FROM unnest($1::BIGINT[], $2::int[]) AS x(id, amount)
|
||||
WHERE v.id = x.id
|
||||
",
|
||||
)
|
||||
.bind(version_downloads.keys().copied().collect::<Vec<_>>())
|
||||
.bind(version_downloads.values().copied().collect::<Vec<_>>())
|
||||
.execute(&mut *transaction)
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
"
|
||||
UPDATE mods m
|
||||
SET downloads = m.downloads + x.amount
|
||||
FROM unnest($1::BIGINT[], $2::int[]) AS x(id, amount)
|
||||
WHERE m.id = x.id
|
||||
",
|
||||
)
|
||||
.bind(project_downloads.keys().copied().collect::<Vec<_>>())
|
||||
.bind(project_downloads.values().copied().collect::<Vec<_>>())
|
||||
.execute(&mut *transaction)
|
||||
.await?;
|
||||
|
||||
transaction.commit().await?;
|
||||
downloads.end().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
82
apps/labrinth/src/queue/maxmind.rs
Normal file
82
apps/labrinth/src/queue/maxmind.rs
Normal file
@@ -0,0 +1,82 @@
|
||||
use flate2::read::GzDecoder;
|
||||
use log::warn;
|
||||
use maxminddb::geoip2::Country;
|
||||
use std::io::{Cursor, Read};
|
||||
use std::net::Ipv6Addr;
|
||||
use tar::Archive;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
pub struct MaxMindIndexer {
|
||||
pub reader: RwLock<Option<maxminddb::Reader<Vec<u8>>>>,
|
||||
}
|
||||
|
||||
impl MaxMindIndexer {
|
||||
pub async fn new() -> Result<Self, reqwest::Error> {
|
||||
let reader = MaxMindIndexer::inner_index(false).await.ok().flatten();
|
||||
|
||||
Ok(MaxMindIndexer {
|
||||
reader: RwLock::new(reader),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn index(&self) -> Result<(), reqwest::Error> {
|
||||
let reader = MaxMindIndexer::inner_index(false).await?;
|
||||
|
||||
if let Some(reader) = reader {
|
||||
let mut reader_new = self.reader.write().await;
|
||||
*reader_new = Some(reader);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn inner_index(
|
||||
should_panic: bool,
|
||||
) -> Result<Option<maxminddb::Reader<Vec<u8>>>, reqwest::Error> {
|
||||
let response = reqwest::get(
|
||||
format!(
|
||||
"https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-Country&license_key={}&suffix=tar.gz",
|
||||
dotenvy::var("MAXMIND_LICENSE_KEY").unwrap()
|
||||
)
|
||||
).await?.bytes().await.unwrap().to_vec();
|
||||
|
||||
let tarfile = GzDecoder::new(Cursor::new(response));
|
||||
let mut archive = Archive::new(tarfile);
|
||||
|
||||
if let Ok(entries) = archive.entries() {
|
||||
for mut file in entries.flatten() {
|
||||
if let Ok(path) = file.header().path() {
|
||||
if path.extension().and_then(|x| x.to_str()) == Some("mmdb") {
|
||||
let mut buf = Vec::new();
|
||||
file.read_to_end(&mut buf).unwrap();
|
||||
|
||||
let reader = maxminddb::Reader::from_source(buf).unwrap();
|
||||
|
||||
return Ok(Some(reader));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if should_panic {
|
||||
panic!("Unable to download maxmind database- did you get a license key?")
|
||||
} else {
|
||||
warn!("Unable to download maxmind database.");
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn query(&self, ip: Ipv6Addr) -> Option<String> {
|
||||
let maxmind = self.reader.read().await;
|
||||
|
||||
if let Some(ref maxmind) = *maxmind {
|
||||
maxmind
|
||||
.lookup::<Country>(ip.into())
|
||||
.ok()
|
||||
.and_then(|x| x.country.and_then(|x| x.iso_code.map(|x| x.to_string())))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
6
apps/labrinth/src/queue/mod.rs
Normal file
6
apps/labrinth/src/queue/mod.rs
Normal file
@@ -0,0 +1,6 @@
|
||||
pub mod analytics;
|
||||
pub mod maxmind;
|
||||
pub mod moderation;
|
||||
pub mod payouts;
|
||||
pub mod session;
|
||||
pub mod socket;
|
||||
885
apps/labrinth/src/queue/moderation.rs
Normal file
885
apps/labrinth/src/queue/moderation.rs
Normal file
@@ -0,0 +1,885 @@
|
||||
use crate::auth::checks::filter_visible_versions;
|
||||
use crate::database;
|
||||
use crate::database::models::notification_item::NotificationBuilder;
|
||||
use crate::database::models::thread_item::ThreadMessageBuilder;
|
||||
use crate::database::redis::RedisPool;
|
||||
use crate::models::ids::ProjectId;
|
||||
use crate::models::notifications::NotificationBody;
|
||||
use crate::models::pack::{PackFile, PackFileHash, PackFormat};
|
||||
use crate::models::projects::ProjectStatus;
|
||||
use crate::models::threads::MessageBody;
|
||||
use crate::routes::ApiError;
|
||||
use dashmap::DashSet;
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::PgPool;
|
||||
use std::collections::HashMap;
|
||||
use std::io::{Cursor, Read};
|
||||
use std::time::Duration;
|
||||
use zip::ZipArchive;
|
||||
|
||||
const AUTOMOD_ID: i64 = 0;
|
||||
|
||||
pub struct ModerationMessages {
|
||||
pub messages: Vec<ModerationMessage>,
|
||||
pub version_specific: HashMap<String, Vec<ModerationMessage>>,
|
||||
}
|
||||
|
||||
impl ModerationMessages {
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.messages.is_empty() && self.version_specific.is_empty()
|
||||
}
|
||||
|
||||
pub fn markdown(&self, auto_mod: bool) -> String {
|
||||
let mut str = "".to_string();
|
||||
|
||||
for message in &self.messages {
|
||||
str.push_str(&format!("## {}\n", message.header()));
|
||||
str.push_str(&format!("{}\n", message.body()));
|
||||
str.push('\n');
|
||||
}
|
||||
|
||||
for (version_num, messages) in &self.version_specific {
|
||||
for message in messages {
|
||||
str.push_str(&format!(
|
||||
"## Version {}: {}\n",
|
||||
version_num,
|
||||
message.header()
|
||||
));
|
||||
str.push_str(&format!("{}\n", message.body()));
|
||||
str.push('\n');
|
||||
}
|
||||
}
|
||||
|
||||
if auto_mod {
|
||||
str.push_str("<hr />\n\n");
|
||||
str.push_str("🤖 This is an automated message generated by AutoMod (BETA). If you are facing issues, please [contact support](https://support.modrinth.com).");
|
||||
}
|
||||
|
||||
str
|
||||
}
|
||||
|
||||
pub fn should_reject(&self, first_time: bool) -> bool {
|
||||
self.messages.iter().any(|x| x.rejectable(first_time))
|
||||
|| self
|
||||
.version_specific
|
||||
.values()
|
||||
.any(|x| x.iter().any(|x| x.rejectable(first_time)))
|
||||
}
|
||||
|
||||
pub fn approvable(&self) -> bool {
|
||||
self.messages.iter().all(|x| x.approvable())
|
||||
&& self
|
||||
.version_specific
|
||||
.values()
|
||||
.all(|x| x.iter().all(|x| x.approvable()))
|
||||
}
|
||||
}
|
||||
|
||||
pub enum ModerationMessage {
|
||||
MissingGalleryImage,
|
||||
NoPrimaryFile,
|
||||
NoSideTypes,
|
||||
PackFilesNotAllowed {
|
||||
files: HashMap<String, IdentifiedFile>,
|
||||
incomplete: bool,
|
||||
},
|
||||
MissingLicense,
|
||||
MissingCustomLicenseUrl {
|
||||
license: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl ModerationMessage {
|
||||
pub fn rejectable(&self, first_time: bool) -> bool {
|
||||
match self {
|
||||
ModerationMessage::NoPrimaryFile => true,
|
||||
ModerationMessage::PackFilesNotAllowed { files, incomplete } => {
|
||||
(!incomplete || first_time)
|
||||
&& files.values().any(|x| match x.status {
|
||||
ApprovalType::Yes => false,
|
||||
ApprovalType::WithAttributionAndSource => false,
|
||||
ApprovalType::WithAttribution => false,
|
||||
ApprovalType::No => first_time,
|
||||
ApprovalType::PermanentNo => true,
|
||||
ApprovalType::Unidentified => first_time,
|
||||
})
|
||||
}
|
||||
ModerationMessage::MissingGalleryImage => true,
|
||||
ModerationMessage::MissingLicense => true,
|
||||
ModerationMessage::MissingCustomLicenseUrl { .. } => true,
|
||||
ModerationMessage::NoSideTypes => true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn approvable(&self) -> bool {
|
||||
match self {
|
||||
ModerationMessage::NoPrimaryFile => false,
|
||||
ModerationMessage::PackFilesNotAllowed { files, .. } => {
|
||||
files.values().all(|x| x.status.approved())
|
||||
}
|
||||
ModerationMessage::MissingGalleryImage => false,
|
||||
ModerationMessage::MissingLicense => false,
|
||||
ModerationMessage::MissingCustomLicenseUrl { .. } => false,
|
||||
ModerationMessage::NoSideTypes => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn header(&self) -> &'static str {
|
||||
match self {
|
||||
ModerationMessage::NoPrimaryFile => "No primary files",
|
||||
ModerationMessage::PackFilesNotAllowed { .. } => "Copyrighted Content",
|
||||
ModerationMessage::MissingGalleryImage => "Missing Gallery Images",
|
||||
ModerationMessage::MissingLicense => "Missing License",
|
||||
ModerationMessage::MissingCustomLicenseUrl { .. } => "Missing License URL",
|
||||
ModerationMessage::NoSideTypes => "Missing Environment Information",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn body(&self) -> String {
|
||||
match self {
|
||||
ModerationMessage::NoPrimaryFile => "Please attach a file to this version. All files on Modrinth must have files associated with their versions.\n".to_string(),
|
||||
ModerationMessage::PackFilesNotAllowed { files, .. } => {
|
||||
let mut str = "".to_string();
|
||||
str.push_str("This pack redistributes copyrighted material. Please refer to [Modrinth's guide on obtaining modpack permissions](https://docs.modrinth.com/modpacks/permissions) for more information.\n\n");
|
||||
|
||||
let mut attribute_mods = Vec::new();
|
||||
let mut no_mods = Vec::new();
|
||||
let mut permanent_no_mods = Vec::new();
|
||||
let mut unidentified_mods = Vec::new();
|
||||
for (_, approval) in files.iter() {
|
||||
match approval.status {
|
||||
ApprovalType::Yes | ApprovalType::WithAttributionAndSource => {}
|
||||
ApprovalType::WithAttribution => attribute_mods.push(&approval.file_name),
|
||||
ApprovalType::No => no_mods.push(&approval.file_name),
|
||||
ApprovalType::PermanentNo => permanent_no_mods.push(&approval.file_name),
|
||||
ApprovalType::Unidentified => unidentified_mods.push(&approval.file_name),
|
||||
}
|
||||
}
|
||||
|
||||
fn print_mods(projects: Vec<&String>, headline: &str, val: &mut String) {
|
||||
if projects.is_empty() { return }
|
||||
|
||||
val.push_str(&format!("{headline}\n\n"));
|
||||
|
||||
for project in &projects {
|
||||
let additional_text = if project.contains("ftb-quests") {
|
||||
Some("Heracles")
|
||||
} else if project.contains("ftb-ranks") || project.contains("ftb-essentials") {
|
||||
Some("Prometheus")
|
||||
} else if project.contains("ftb-teams") {
|
||||
Some("Argonauts")
|
||||
} else if project.contains("ftb-chunks") {
|
||||
Some("Cadmus")
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
val.push_str(&if let Some(additional_text) = additional_text {
|
||||
format!("- {project}(consider using [{additional_text}](https://modrinth.com/mod/{}) instead)\n", additional_text.to_lowercase())
|
||||
} else {
|
||||
format!("- {project}\n")
|
||||
})
|
||||
}
|
||||
|
||||
if !projects.is_empty() {
|
||||
val.push('\n');
|
||||
}
|
||||
}
|
||||
|
||||
print_mods(attribute_mods, "The following content has attribution requirements, meaning that you must link back to the page where you originally found this content in your modpack description or version changelog (e.g. linking a mod's CurseForge page if you got it from CurseForge):", &mut str);
|
||||
print_mods(no_mods, "The following content is not allowed in Modrinth modpacks due to licensing restrictions. Please contact the author(s) directly for permission or remove the content from your modpack:", &mut str);
|
||||
print_mods(permanent_no_mods, "The following content is not allowed in Modrinth modpacks, regardless of permission obtained. This may be because it breaks Modrinth's content rules or because the authors, upon being contacted for permission, have declined. Please remove the content from your modpack:", &mut str);
|
||||
print_mods(unidentified_mods, "The following content could not be identified. Please provide proof of its origin along with proof that you have permission to include it:", &mut str);
|
||||
|
||||
str
|
||||
},
|
||||
ModerationMessage::MissingGalleryImage => "We ask that resource packs like yours show off their content using images in the Gallery, or optionally in the Description, in order to effectively and clearly inform users of the content in your pack per section 2.1 of [Modrinth's content rules](https://modrinth.com/legal/rules#general-expectations).\n
|
||||
Keep in mind that you should:\n
|
||||
- Set a featured image that best represents your pack.
|
||||
- Ensure all your images have titles that accurately label the image, and optionally, details on the contents of the image in the images Description.
|
||||
- Upload any relevant images in your Description to your Gallery tab for best results.".to_string(),
|
||||
ModerationMessage::MissingLicense => "You must select a License before your project can be published publicly, having a License associated with your project is important to protecting your rights and allowing others to use your content as you intend. For more information, you can see our [Guide to Licensing Mods](<https://blog.modrinth.com/licensing-guide/>).".to_string(),
|
||||
ModerationMessage::MissingCustomLicenseUrl { license } => format!("It looks like you've selected the License \"{license}\" without providing a valid License link. When using a custom License you must provide a link directly to the License in the License Link field."),
|
||||
ModerationMessage::NoSideTypes => "Your project's side types are currently set to Unknown on both sides. Please set accurate side types!".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AutomatedModerationQueue {
|
||||
pub projects: DashSet<ProjectId>,
|
||||
}
|
||||
|
||||
impl Default for AutomatedModerationQueue {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
projects: DashSet::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AutomatedModerationQueue {
|
||||
pub async fn task(&self, pool: PgPool, redis: RedisPool) {
|
||||
loop {
|
||||
let projects = self.projects.clone();
|
||||
self.projects.clear();
|
||||
|
||||
for project in projects {
|
||||
async {
|
||||
let project =
|
||||
database::Project::get_id((project).into(), &pool, &redis).await?;
|
||||
|
||||
if let Some(project) = project {
|
||||
let res = async {
|
||||
let mut mod_messages = ModerationMessages {
|
||||
messages: vec![],
|
||||
version_specific: HashMap::new(),
|
||||
};
|
||||
|
||||
if project.project_types.iter().any(|x| ["mod", "modpack"].contains(&&**x)) && !project.aggregate_version_fields.iter().any(|x| ["server_only", "client_only", "client_and_server", "singleplayer"].contains(&&*x.field_name)) {
|
||||
mod_messages.messages.push(ModerationMessage::NoSideTypes);
|
||||
}
|
||||
|
||||
if project.inner.license == "LicenseRef-Unknown" || project.inner.license == "LicenseRef-" {
|
||||
mod_messages.messages.push(ModerationMessage::MissingLicense);
|
||||
} else if project.inner.license.starts_with("LicenseRef-") && project.inner.license != "LicenseRef-All-Rights-Reserved" && project.inner.license_url.is_none() {
|
||||
mod_messages.messages.push(ModerationMessage::MissingCustomLicenseUrl { license: project.inner.license.clone() });
|
||||
}
|
||||
|
||||
if (project.project_types.contains(&"resourcepack".to_string()) || project.project_types.contains(&"shader".to_string())) &&
|
||||
project.gallery_items.is_empty() &&
|
||||
!project.categories.contains(&"audio".to_string()) &&
|
||||
!project.categories.contains(&"locale".to_string())
|
||||
{
|
||||
mod_messages.messages.push(ModerationMessage::MissingGalleryImage);
|
||||
}
|
||||
|
||||
let versions =
|
||||
database::Version::get_many(&project.versions, &pool, &redis)
|
||||
.await?
|
||||
.into_iter()
|
||||
// we only support modpacks at this time
|
||||
.filter(|x| x.project_types.contains(&"modpack".to_string()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for version in versions {
|
||||
let primary_file = version.files.iter().find_or_first(|x| x.primary);
|
||||
|
||||
if let Some(primary_file) = primary_file {
|
||||
let data = reqwest::get(&primary_file.url).await?.bytes().await?;
|
||||
|
||||
let reader = Cursor::new(data);
|
||||
let mut zip = ZipArchive::new(reader)?;
|
||||
|
||||
let pack: PackFormat = {
|
||||
let mut file =
|
||||
if let Ok(file) = zip.by_name("modrinth.index.json") {
|
||||
file
|
||||
} else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let mut contents = String::new();
|
||||
file.read_to_string(&mut contents)?;
|
||||
|
||||
serde_json::from_str(&contents)?
|
||||
};
|
||||
|
||||
// sha1, pack file, file path, murmur
|
||||
let mut hashes: Vec<(
|
||||
String,
|
||||
Option<PackFile>,
|
||||
String,
|
||||
Option<u32>,
|
||||
)> = pack
|
||||
.files
|
||||
.clone()
|
||||
.into_iter()
|
||||
.flat_map(|x| {
|
||||
let hash = x.hashes.get(&PackFileHash::Sha1);
|
||||
|
||||
if let Some(hash) = hash {
|
||||
let path = x.path.clone();
|
||||
Some((hash.clone(), Some(x), path, None))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
for i in 0..zip.len() {
|
||||
let mut file = zip.by_index(i)?;
|
||||
|
||||
if file.name().starts_with("overrides/mods")
|
||||
|| file.name().starts_with("client-overrides/mods")
|
||||
|| file.name().starts_with("server-overrides/mods")
|
||||
|| file.name().starts_with("overrides/shaderpacks")
|
||||
|| file.name().starts_with("client-overrides/shaderpacks")
|
||||
|| file.name().starts_with("overrides/resourcepacks")
|
||||
|| file.name().starts_with("client-overrides/resourcepacks")
|
||||
{
|
||||
if file.name().matches('/').count() > 2 || file.name().ends_with(".txt") {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut contents = Vec::new();
|
||||
file.read_to_end(&mut contents)?;
|
||||
|
||||
let hash = sha1::Sha1::from(&contents).hexdigest();
|
||||
let murmur = hash_flame_murmur32(contents);
|
||||
|
||||
hashes.push((
|
||||
hash,
|
||||
None,
|
||||
file.name().to_string(),
|
||||
Some(murmur),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let files = database::models::Version::get_files_from_hash(
|
||||
"sha1".to_string(),
|
||||
&hashes.iter().map(|x| x.0.clone()).collect::<Vec<_>>(),
|
||||
&pool,
|
||||
&redis,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let version_ids =
|
||||
files.iter().map(|x| x.version_id).collect::<Vec<_>>();
|
||||
let versions_data = filter_visible_versions(
|
||||
database::models::Version::get_many(
|
||||
&version_ids,
|
||||
&pool,
|
||||
&redis,
|
||||
)
|
||||
.await?,
|
||||
&None,
|
||||
&pool,
|
||||
&redis,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut final_hashes = HashMap::new();
|
||||
|
||||
for version in versions_data {
|
||||
for file in
|
||||
files.iter().filter(|x| x.version_id == version.id.into())
|
||||
{
|
||||
if let Some(hash) = file.hashes.get("sha1") {
|
||||
if let Some((index, (sha1, _, file_name, _))) = hashes
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(_, (value, _, _, _))| value == hash)
|
||||
{
|
||||
final_hashes
|
||||
.insert(sha1.clone(), IdentifiedFile { status: ApprovalType::Yes, file_name: file_name.clone() });
|
||||
|
||||
hashes.remove(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// All files are on Modrinth, so we don't send any messages
|
||||
if hashes.is_empty() {
|
||||
sqlx::query!(
|
||||
"
|
||||
UPDATE files
|
||||
SET metadata = $1
|
||||
WHERE id = $2
|
||||
",
|
||||
serde_json::to_value(&MissingMetadata {
|
||||
identified: final_hashes,
|
||||
flame_files: Default::default(),
|
||||
unknown_files: Default::default(),
|
||||
})?,
|
||||
primary_file.id.0
|
||||
)
|
||||
.execute(&pool)
|
||||
.await?;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
let rows = sqlx::query!(
|
||||
"
|
||||
SELECT encode(mef.sha1, 'escape') sha1, mel.status status
|
||||
FROM moderation_external_files mef
|
||||
INNER JOIN moderation_external_licenses mel ON mef.external_license_id = mel.id
|
||||
WHERE mef.sha1 = ANY($1)
|
||||
",
|
||||
&hashes.iter().map(|x| x.0.as_bytes().to_vec()).collect::<Vec<_>>()
|
||||
)
|
||||
.fetch_all(&pool)
|
||||
.await?;
|
||||
|
||||
for row in rows {
|
||||
if let Some(sha1) = row.sha1 {
|
||||
if let Some((index, (sha1, _, file_name, _))) = hashes.iter().enumerate().find(|(_, (value, _, _, _))| value == &sha1) {
|
||||
final_hashes.insert(sha1.clone(), IdentifiedFile { file_name: file_name.clone(), status: ApprovalType::from_string(&row.status).unwrap_or(ApprovalType::Unidentified) });
|
||||
hashes.remove(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if hashes.is_empty() {
|
||||
let metadata = MissingMetadata {
|
||||
identified: final_hashes,
|
||||
flame_files: Default::default(),
|
||||
unknown_files: Default::default(),
|
||||
};
|
||||
|
||||
sqlx::query!(
|
||||
"
|
||||
UPDATE files
|
||||
SET metadata = $1
|
||||
WHERE id = $2
|
||||
",
|
||||
serde_json::to_value(&metadata)?,
|
||||
primary_file.id.0
|
||||
)
|
||||
.execute(&pool)
|
||||
.await?;
|
||||
|
||||
if metadata.identified.values().any(|x| x.status != ApprovalType::Yes && x.status != ApprovalType::WithAttributionAndSource) {
|
||||
let val = mod_messages.version_specific.entry(version.inner.version_number).or_default();
|
||||
val.push(ModerationMessage::PackFilesNotAllowed {files: metadata.identified, incomplete: false });
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let res = client
|
||||
.post(format!("{}/v1/fingerprints", dotenvy::var("FLAME_ANVIL_URL")?))
|
||||
.json(&serde_json::json!({
|
||||
"fingerprints": hashes.iter().filter_map(|x| x.3).collect::<Vec<u32>>()
|
||||
}))
|
||||
.send()
|
||||
.await?.text()
|
||||
.await?;
|
||||
|
||||
let flame_hashes = serde_json::from_str::<FlameResponse<FingerprintResponse>>(&res)?
|
||||
.data
|
||||
.exact_matches
|
||||
.into_iter()
|
||||
.map(|x| x.file)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut flame_files = Vec::new();
|
||||
|
||||
for file in flame_hashes {
|
||||
let hash = file
|
||||
.hashes
|
||||
.iter()
|
||||
.find(|x| x.algo == 1)
|
||||
.map(|x| x.value.clone());
|
||||
|
||||
if let Some(hash) = hash {
|
||||
flame_files.push((hash, file.mod_id))
|
||||
}
|
||||
}
|
||||
|
||||
let rows = sqlx::query!(
|
||||
"
|
||||
SELECT mel.id, mel.flame_project_id, mel.status status
|
||||
FROM moderation_external_licenses mel
|
||||
WHERE mel.flame_project_id = ANY($1)
|
||||
",
|
||||
&flame_files.iter().map(|x| x.1 as i32).collect::<Vec<_>>()
|
||||
)
|
||||
.fetch_all(&pool).await?;
|
||||
|
||||
let mut insert_hashes = Vec::new();
|
||||
let mut insert_ids = Vec::new();
|
||||
|
||||
for row in rows {
|
||||
if let Some((curse_index, (hash, _flame_id))) = flame_files.iter().enumerate().find(|(_, x)| Some(x.1 as i32) == row.flame_project_id) {
|
||||
if let Some((index, (sha1, _, file_name, _))) = hashes.iter().enumerate().find(|(_, (value, _, _, _))| value == hash) {
|
||||
final_hashes.insert(sha1.clone(), IdentifiedFile {
|
||||
file_name: file_name.clone(),
|
||||
status: ApprovalType::from_string(&row.status).unwrap_or(ApprovalType::Unidentified),
|
||||
});
|
||||
|
||||
insert_hashes.push(hash.clone().as_bytes().to_vec());
|
||||
insert_ids.push(row.id);
|
||||
|
||||
hashes.remove(index);
|
||||
flame_files.remove(curse_index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !insert_ids.is_empty() && !insert_hashes.is_empty() {
|
||||
sqlx::query!(
|
||||
"
|
||||
INSERT INTO moderation_external_files (sha1, external_license_id)
|
||||
SELECT * FROM UNNEST ($1::bytea[], $2::bigint[])
|
||||
ON CONFLICT (sha1) DO NOTHING
|
||||
",
|
||||
&insert_hashes[..],
|
||||
&insert_ids[..]
|
||||
)
|
||||
.execute(&pool)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if hashes.is_empty() {
|
||||
let metadata = MissingMetadata {
|
||||
identified: final_hashes,
|
||||
flame_files: Default::default(),
|
||||
unknown_files: Default::default(),
|
||||
};
|
||||
|
||||
sqlx::query!(
|
||||
"
|
||||
UPDATE files
|
||||
SET metadata = $1
|
||||
WHERE id = $2
|
||||
",
|
||||
serde_json::to_value(&metadata)?,
|
||||
primary_file.id.0
|
||||
)
|
||||
.execute(&pool)
|
||||
.await?;
|
||||
|
||||
if metadata.identified.values().any(|x| x.status != ApprovalType::Yes && x.status != ApprovalType::WithAttributionAndSource) {
|
||||
let val = mod_messages.version_specific.entry(version.inner.version_number).or_default();
|
||||
val.push(ModerationMessage::PackFilesNotAllowed {files: metadata.identified, incomplete: false });
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
let flame_projects = if flame_files.is_empty() {
|
||||
Vec::new()
|
||||
} else {
|
||||
let res = client
|
||||
.post(format!("{}v1/mods", dotenvy::var("FLAME_ANVIL_URL")?))
|
||||
.json(&serde_json::json!({
|
||||
"modIds": flame_files.iter().map(|x| x.1).collect::<Vec<_>>()
|
||||
}))
|
||||
.send()
|
||||
.await?
|
||||
.text()
|
||||
.await?;
|
||||
|
||||
serde_json::from_str::<FlameResponse<Vec<FlameProject>>>(&res)?.data
|
||||
};
|
||||
|
||||
let mut missing_metadata = MissingMetadata {
|
||||
identified: final_hashes,
|
||||
flame_files: HashMap::new(),
|
||||
unknown_files: HashMap::new(),
|
||||
};
|
||||
|
||||
for (sha1, _pack_file, file_name, _mumur2) in hashes {
|
||||
let flame_file = flame_files.iter().find(|x| x.0 == sha1);
|
||||
|
||||
if let Some((_, flame_project_id)) = flame_file {
|
||||
if let Some(project) = flame_projects.iter().find(|x| &x.id == flame_project_id) {
|
||||
missing_metadata.flame_files.insert(sha1, MissingMetadataFlame {
|
||||
title: project.name.clone(),
|
||||
file_name,
|
||||
url: project.links.website_url.clone(),
|
||||
id: *flame_project_id,
|
||||
});
|
||||
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
missing_metadata.unknown_files.insert(sha1, file_name);
|
||||
}
|
||||
|
||||
sqlx::query!(
|
||||
"
|
||||
UPDATE files
|
||||
SET metadata = $1
|
||||
WHERE id = $2
|
||||
",
|
||||
serde_json::to_value(&missing_metadata)?,
|
||||
primary_file.id.0
|
||||
)
|
||||
.execute(&pool)
|
||||
.await?;
|
||||
|
||||
if missing_metadata.identified.values().any(|x| x.status != ApprovalType::Yes && x.status != ApprovalType::WithAttributionAndSource) {
|
||||
let val = mod_messages.version_specific.entry(version.inner.version_number).or_default();
|
||||
val.push(ModerationMessage::PackFilesNotAllowed {files: missing_metadata.identified, incomplete: true });
|
||||
}
|
||||
} else {
|
||||
let val = mod_messages.version_specific.entry(version.inner.version_number).or_default();
|
||||
val.push(ModerationMessage::NoPrimaryFile);
|
||||
}
|
||||
}
|
||||
|
||||
if !mod_messages.is_empty() {
|
||||
let first_time = database::models::Thread::get(project.thread_id, &pool).await?
|
||||
.map(|x| x.messages.iter().all(|x| x.author_id == Some(database::models::UserId(AUTOMOD_ID)) || x.hide_identity))
|
||||
.unwrap_or(true);
|
||||
|
||||
let mut transaction = pool.begin().await?;
|
||||
let id = ThreadMessageBuilder {
|
||||
author_id: Some(database::models::UserId(AUTOMOD_ID)),
|
||||
body: MessageBody::Text {
|
||||
body: mod_messages.markdown(true),
|
||||
private: false,
|
||||
replying_to: None,
|
||||
associated_images: vec![],
|
||||
},
|
||||
thread_id: project.thread_id,
|
||||
hide_identity: false,
|
||||
}
|
||||
.insert(&mut transaction)
|
||||
.await?;
|
||||
|
||||
let members = database::models::TeamMember::get_from_team_full(
|
||||
project.inner.team_id,
|
||||
&pool,
|
||||
&redis,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if mod_messages.should_reject(first_time) {
|
||||
ThreadMessageBuilder {
|
||||
author_id: Some(database::models::UserId(AUTOMOD_ID)),
|
||||
body: MessageBody::StatusChange {
|
||||
new_status: ProjectStatus::Rejected,
|
||||
old_status: project.inner.status,
|
||||
},
|
||||
thread_id: project.thread_id,
|
||||
hide_identity: false,
|
||||
}
|
||||
.insert(&mut transaction)
|
||||
.await?;
|
||||
|
||||
NotificationBuilder {
|
||||
body: NotificationBody::StatusChange {
|
||||
project_id: project.inner.id.into(),
|
||||
old_status: project.inner.status,
|
||||
new_status: ProjectStatus::Rejected,
|
||||
},
|
||||
}
|
||||
.insert_many(members.into_iter().map(|x| x.user_id).collect(), &mut transaction, &redis)
|
||||
.await?;
|
||||
|
||||
if let Ok(webhook_url) = dotenvy::var("MODERATION_SLACK_WEBHOOK") {
|
||||
crate::util::webhook::send_slack_webhook(
|
||||
project.inner.id.into(),
|
||||
&pool,
|
||||
&redis,
|
||||
webhook_url,
|
||||
Some(
|
||||
format!(
|
||||
"*<{}/user/AutoMod|AutoMod>* changed project status from *{}* to *Rejected*",
|
||||
dotenvy::var("SITE_URL")?,
|
||||
&project.inner.status.as_friendly_str(),
|
||||
)
|
||||
.to_string(),
|
||||
),
|
||||
)
|
||||
.await
|
||||
.ok();
|
||||
}
|
||||
|
||||
sqlx::query!(
|
||||
"
|
||||
UPDATE mods
|
||||
SET status = 'rejected'
|
||||
WHERE id = $1
|
||||
",
|
||||
project.inner.id.0
|
||||
)
|
||||
.execute(&pool)
|
||||
.await?;
|
||||
|
||||
database::models::Project::clear_cache(
|
||||
project.inner.id,
|
||||
project.inner.slug.clone(),
|
||||
None,
|
||||
&redis,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
NotificationBuilder {
|
||||
body: NotificationBody::ModeratorMessage {
|
||||
thread_id: project.thread_id.into(),
|
||||
message_id: id.into(),
|
||||
project_id: Some(project.inner.id.into()),
|
||||
report_id: None,
|
||||
},
|
||||
}
|
||||
.insert_many(
|
||||
members.into_iter().map(|x| x.user_id).collect(),
|
||||
&mut transaction,
|
||||
&redis,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
transaction.commit().await?;
|
||||
}
|
||||
|
||||
Ok::<(), ApiError>(())
|
||||
}.await;
|
||||
|
||||
if let Err(err) = res {
|
||||
let err = err.as_api_error();
|
||||
|
||||
let mut str = String::new();
|
||||
str.push_str("## Internal AutoMod Error\n\n");
|
||||
str.push_str(&format!("Error code: {}\n\n", err.error));
|
||||
str.push_str(&format!("Error description: {}\n\n", err.description));
|
||||
|
||||
let mut transaction = pool.begin().await?;
|
||||
ThreadMessageBuilder {
|
||||
author_id: Some(database::models::UserId(AUTOMOD_ID)),
|
||||
body: MessageBody::Text {
|
||||
body: str,
|
||||
private: true,
|
||||
replying_to: None,
|
||||
associated_images: vec![],
|
||||
},
|
||||
thread_id: project.thread_id,
|
||||
hide_identity: false,
|
||||
}
|
||||
.insert(&mut transaction)
|
||||
.await?;
|
||||
transaction.commit().await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok::<(), ApiError>(())
|
||||
}.await.ok();
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(5)).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct MissingMetadata {
|
||||
pub identified: HashMap<String, IdentifiedFile>,
|
||||
pub flame_files: HashMap<String, MissingMetadataFlame>,
|
||||
pub unknown_files: HashMap<String, String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct IdentifiedFile {
|
||||
pub file_name: String,
|
||||
pub status: ApprovalType,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct MissingMetadataFlame {
|
||||
pub title: String,
|
||||
pub file_name: String,
|
||||
pub url: String,
|
||||
pub id: u32,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Copy, Clone, PartialEq, Eq, Debug)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum ApprovalType {
|
||||
Yes,
|
||||
WithAttributionAndSource,
|
||||
WithAttribution,
|
||||
No,
|
||||
PermanentNo,
|
||||
Unidentified,
|
||||
}
|
||||
|
||||
impl ApprovalType {
|
||||
fn approved(&self) -> bool {
|
||||
match self {
|
||||
ApprovalType::Yes => true,
|
||||
ApprovalType::WithAttributionAndSource => true,
|
||||
ApprovalType::WithAttribution => true,
|
||||
ApprovalType::No => false,
|
||||
ApprovalType::PermanentNo => false,
|
||||
ApprovalType::Unidentified => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_string(string: &str) -> Option<Self> {
|
||||
match string {
|
||||
"yes" => Some(ApprovalType::Yes),
|
||||
"with-attribution-and-source" => Some(ApprovalType::WithAttributionAndSource),
|
||||
"with-attribution" => Some(ApprovalType::WithAttribution),
|
||||
"no" => Some(ApprovalType::No),
|
||||
"permanent-no" => Some(ApprovalType::PermanentNo),
|
||||
"unidentified" => Some(ApprovalType::Unidentified),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
ApprovalType::Yes => "yes",
|
||||
ApprovalType::WithAttributionAndSource => "with-attribution-and-source",
|
||||
ApprovalType::WithAttribution => "with-attribution",
|
||||
ApprovalType::No => "no",
|
||||
ApprovalType::PermanentNo => "permanent-no",
|
||||
ApprovalType::Unidentified => "unidentified",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
pub struct FlameResponse<T> {
|
||||
pub data: T,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct FingerprintResponse {
|
||||
pub exact_matches: Vec<FingerprintMatch>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
pub struct FingerprintMatch {
|
||||
pub id: u32,
|
||||
pub file: FlameFile,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct FlameFile {
|
||||
pub id: u32,
|
||||
pub mod_id: u32,
|
||||
pub hashes: Vec<FlameFileHash>,
|
||||
pub file_fingerprint: u32,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Debug)]
|
||||
pub struct FlameFileHash {
|
||||
pub value: String,
|
||||
pub algo: u32,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct FlameProject {
|
||||
pub id: u32,
|
||||
pub name: String,
|
||||
pub slug: String,
|
||||
pub links: FlameLinks,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct FlameLinks {
|
||||
pub website_url: String,
|
||||
}
|
||||
|
||||
fn hash_flame_murmur32(input: Vec<u8>) -> u32 {
|
||||
murmur2::murmur2(
|
||||
&input
|
||||
.into_iter()
|
||||
.filter(|x| *x != 9 && *x != 10 && *x != 13 && *x != 32)
|
||||
.collect::<Vec<u8>>(),
|
||||
1,
|
||||
)
|
||||
}
|
||||
873
apps/labrinth/src/queue/payouts.rs
Normal file
873
apps/labrinth/src/queue/payouts.rs
Normal file
@@ -0,0 +1,873 @@
|
||||
use crate::models::payouts::{
|
||||
PayoutDecimal, PayoutInterval, PayoutMethod, PayoutMethodFee, PayoutMethodType,
|
||||
};
|
||||
use crate::models::projects::MonetizationStatus;
|
||||
use crate::routes::ApiError;
|
||||
use base64::Engine;
|
||||
use chrono::{DateTime, Datelike, Duration, TimeZone, Utc};
|
||||
use dashmap::DashMap;
|
||||
use futures::TryStreamExt;
|
||||
use reqwest::Method;
|
||||
use rust_decimal::Decimal;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use sqlx::postgres::PgQueryResult;
|
||||
use sqlx::PgPool;
|
||||
use std::collections::HashMap;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
pub struct PayoutsQueue {
|
||||
credential: RwLock<Option<PayPalCredentials>>,
|
||||
payout_options: RwLock<Option<PayoutMethods>>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct PayPalCredentials {
|
||||
access_token: String,
|
||||
token_type: String,
|
||||
expires: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct PayoutMethods {
|
||||
options: Vec<PayoutMethod>,
|
||||
expires: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl Default for PayoutsQueue {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
// Batches payouts and handles token refresh
|
||||
impl PayoutsQueue {
|
||||
pub fn new() -> Self {
|
||||
PayoutsQueue {
|
||||
credential: RwLock::new(None),
|
||||
payout_options: RwLock::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
async fn refresh_token(&self) -> Result<PayPalCredentials, ApiError> {
|
||||
let mut creds = self.credential.write().await;
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
let combined_key = format!(
|
||||
"{}:{}",
|
||||
dotenvy::var("PAYPAL_CLIENT_ID")?,
|
||||
dotenvy::var("PAYPAL_CLIENT_SECRET")?
|
||||
);
|
||||
let formatted_key = format!(
|
||||
"Basic {}",
|
||||
base64::engine::general_purpose::STANDARD.encode(combined_key)
|
||||
);
|
||||
|
||||
let mut form = HashMap::new();
|
||||
form.insert("grant_type", "client_credentials");
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct PaypalCredential {
|
||||
access_token: String,
|
||||
token_type: String,
|
||||
expires_in: i64,
|
||||
}
|
||||
|
||||
let credential: PaypalCredential = client
|
||||
.post(format!("{}oauth2/token", dotenvy::var("PAYPAL_API_URL")?))
|
||||
.header("Accept", "application/json")
|
||||
.header("Accept-Language", "en_US")
|
||||
.header("Authorization", formatted_key)
|
||||
.form(&form)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|_| ApiError::Payments("Error while authenticating with PayPal".to_string()))?
|
||||
.json()
|
||||
.await
|
||||
.map_err(|_| {
|
||||
ApiError::Payments(
|
||||
"Error while authenticating with PayPal (deser error)".to_string(),
|
||||
)
|
||||
})?;
|
||||
|
||||
let new_creds = PayPalCredentials {
|
||||
access_token: credential.access_token,
|
||||
token_type: credential.token_type,
|
||||
expires: Utc::now() + Duration::seconds(credential.expires_in),
|
||||
};
|
||||
|
||||
*creds = Some(new_creds.clone());
|
||||
|
||||
Ok(new_creds)
|
||||
}
|
||||
|
||||
pub async fn make_paypal_request<T: Serialize, X: DeserializeOwned>(
|
||||
&self,
|
||||
method: Method,
|
||||
path: &str,
|
||||
body: Option<T>,
|
||||
raw_text: Option<String>,
|
||||
no_api_prefix: Option<bool>,
|
||||
) -> Result<X, ApiError> {
|
||||
let read = self.credential.read().await;
|
||||
let credentials = if let Some(credentials) = read.as_ref() {
|
||||
if credentials.expires < Utc::now() {
|
||||
drop(read);
|
||||
self.refresh_token().await.map_err(|_| {
|
||||
ApiError::Payments("Error while authenticating with PayPal".to_string())
|
||||
})?
|
||||
} else {
|
||||
credentials.clone()
|
||||
}
|
||||
} else {
|
||||
drop(read);
|
||||
self.refresh_token().await.map_err(|_| {
|
||||
ApiError::Payments("Error while authenticating with PayPal".to_string())
|
||||
})?
|
||||
};
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let mut request = client
|
||||
.request(
|
||||
method,
|
||||
if no_api_prefix.unwrap_or(false) {
|
||||
path.to_string()
|
||||
} else {
|
||||
format!("{}{path}", dotenvy::var("PAYPAL_API_URL")?)
|
||||
},
|
||||
)
|
||||
.header(
|
||||
"Authorization",
|
||||
format!("{} {}", credentials.token_type, credentials.access_token),
|
||||
);
|
||||
|
||||
if let Some(body) = body {
|
||||
request = request.json(&body);
|
||||
} else if let Some(body) = raw_text {
|
||||
request = request
|
||||
.header(reqwest::header::CONTENT_TYPE, "application/json")
|
||||
.body(body);
|
||||
}
|
||||
|
||||
let resp = request
|
||||
.send()
|
||||
.await
|
||||
.map_err(|_| ApiError::Payments("could not communicate with PayPal".to_string()))?;
|
||||
|
||||
let status = resp.status();
|
||||
|
||||
let value = resp.json::<Value>().await.map_err(|_| {
|
||||
ApiError::Payments("could not retrieve PayPal response body".to_string())
|
||||
})?;
|
||||
|
||||
if !status.is_success() {
|
||||
#[derive(Deserialize)]
|
||||
struct PayPalError {
|
||||
pub name: String,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct PayPalIdentityError {
|
||||
pub error: String,
|
||||
pub error_description: String,
|
||||
}
|
||||
|
||||
if let Ok(error) = serde_json::from_value::<PayPalError>(value.clone()) {
|
||||
return Err(ApiError::Payments(format!(
|
||||
"error name: {}, message: {}",
|
||||
error.name, error.message
|
||||
)));
|
||||
}
|
||||
|
||||
if let Ok(error) = serde_json::from_value::<PayPalIdentityError>(value) {
|
||||
return Err(ApiError::Payments(format!(
|
||||
"error name: {}, message: {}",
|
||||
error.error, error.error_description
|
||||
)));
|
||||
}
|
||||
|
||||
return Err(ApiError::Payments(
|
||||
"could not retrieve PayPal error body".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(serde_json::from_value(value)?)
|
||||
}
|
||||
|
||||
pub async fn make_tremendous_request<T: Serialize, X: DeserializeOwned>(
|
||||
&self,
|
||||
method: Method,
|
||||
path: &str,
|
||||
body: Option<T>,
|
||||
) -> Result<X, ApiError> {
|
||||
let client = reqwest::Client::new();
|
||||
let mut request = client
|
||||
.request(
|
||||
method,
|
||||
format!("{}{path}", dotenvy::var("TREMENDOUS_API_URL")?),
|
||||
)
|
||||
.header(
|
||||
"Authorization",
|
||||
format!("Bearer {}", dotenvy::var("TREMENDOUS_API_KEY")?),
|
||||
);
|
||||
|
||||
if let Some(body) = body {
|
||||
request = request.json(&body);
|
||||
}
|
||||
|
||||
let resp = request
|
||||
.send()
|
||||
.await
|
||||
.map_err(|_| ApiError::Payments("could not communicate with Tremendous".to_string()))?;
|
||||
|
||||
let status = resp.status();
|
||||
|
||||
let value = resp.json::<Value>().await.map_err(|_| {
|
||||
ApiError::Payments("could not retrieve Tremendous response body".to_string())
|
||||
})?;
|
||||
|
||||
if !status.is_success() {
|
||||
if let Some(obj) = value.as_object() {
|
||||
if let Some(array) = obj.get("errors") {
|
||||
#[derive(Deserialize)]
|
||||
struct TremendousError {
|
||||
message: String,
|
||||
}
|
||||
|
||||
let err =
|
||||
serde_json::from_value::<TremendousError>(array.clone()).map_err(|_| {
|
||||
ApiError::Payments(
|
||||
"could not retrieve Tremendous error json body".to_string(),
|
||||
)
|
||||
})?;
|
||||
|
||||
return Err(ApiError::Payments(err.message));
|
||||
}
|
||||
|
||||
return Err(ApiError::Payments(
|
||||
"could not retrieve Tremendous error body".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(serde_json::from_value(value)?)
|
||||
}
|
||||
|
||||
pub async fn get_payout_methods(&self) -> Result<Vec<PayoutMethod>, ApiError> {
|
||||
async fn refresh_payout_methods(queue: &PayoutsQueue) -> Result<PayoutMethods, ApiError> {
|
||||
let mut options = queue.payout_options.write().await;
|
||||
|
||||
let mut methods = Vec::new();
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct Sku {
|
||||
pub min: Decimal,
|
||||
pub max: Decimal,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Eq, PartialEq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ProductImageType {
|
||||
Card,
|
||||
Logo,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct ProductImage {
|
||||
pub src: String,
|
||||
#[serde(rename = "type")]
|
||||
pub type_: ProductImageType,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct ProductCountry {
|
||||
pub abbr: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct Product {
|
||||
pub id: String,
|
||||
pub category: String,
|
||||
pub name: String,
|
||||
// pub description: String,
|
||||
// pub disclosure: String,
|
||||
pub skus: Vec<Sku>,
|
||||
pub currency_codes: Vec<String>,
|
||||
pub countries: Vec<ProductCountry>,
|
||||
pub images: Vec<ProductImage>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct TremendousResponse {
|
||||
pub products: Vec<Product>,
|
||||
}
|
||||
|
||||
let response = queue
|
||||
.make_tremendous_request::<(), TremendousResponse>(Method::GET, "products", None)
|
||||
.await?;
|
||||
|
||||
for product in response.products {
|
||||
const BLACKLISTED_IDS: &[&str] = &[
|
||||
// physical visa
|
||||
"A2J05SWPI2QG",
|
||||
// crypto
|
||||
"1UOOSHUUYTAM",
|
||||
"5EVJN47HPDFT",
|
||||
"NI9M4EVAVGFJ",
|
||||
"VLY29QHTMNGT",
|
||||
"7XU98H109Y3A",
|
||||
"0CGEDFP2UIKV",
|
||||
"PDYLQU0K073Y",
|
||||
"HCS5Z7O2NV5G",
|
||||
"IY1VMST1MOXS",
|
||||
"VRPZLJ7HCA8X",
|
||||
// bitcard (crypto)
|
||||
"GWQQS5RM8IZS",
|
||||
"896MYD4SGOGZ",
|
||||
"PWLEN1VZGMZA",
|
||||
"A2VRM96J5K5W",
|
||||
"HV9ICIM3JT7P",
|
||||
"K2KLSPVWC2Q4",
|
||||
"HRBRQLLTDF95",
|
||||
"UUBYLZVK7QAB",
|
||||
"BH8W3XEDEOJN",
|
||||
"7WGE043X1RYQ",
|
||||
"2B13MHUZZVTF",
|
||||
"JN6R44P86EYX",
|
||||
"DA8H43GU84SO",
|
||||
"QK2XAQHSDEH4",
|
||||
"J7K1IQFS76DK",
|
||||
"NL4JQ2G7UPRZ",
|
||||
"OEFTMSBA5ELH",
|
||||
"A3CQK6UHNV27",
|
||||
];
|
||||
const SUPPORTED_METHODS: &[&str] = &[
|
||||
"merchant_cards",
|
||||
"merchant_card",
|
||||
"visa",
|
||||
"bank",
|
||||
"ach",
|
||||
"visa_card",
|
||||
];
|
||||
|
||||
if !SUPPORTED_METHODS.contains(&&*product.category)
|
||||
|| BLACKLISTED_IDS.contains(&&*product.id)
|
||||
{
|
||||
continue;
|
||||
};
|
||||
|
||||
let method = PayoutMethod {
|
||||
id: product.id,
|
||||
type_: PayoutMethodType::Tremendous,
|
||||
name: product.name.clone(),
|
||||
supported_countries: product.countries.into_iter().map(|x| x.abbr).collect(),
|
||||
image_url: product
|
||||
.images
|
||||
.into_iter()
|
||||
.find(|x| x.type_ == ProductImageType::Card)
|
||||
.map(|x| x.src),
|
||||
interval: if product.skus.len() > 1 {
|
||||
let mut values = product
|
||||
.skus
|
||||
.into_iter()
|
||||
.map(|x| PayoutDecimal(x.min))
|
||||
.collect::<Vec<_>>();
|
||||
values.sort_by(|a, b| a.0.cmp(&b.0));
|
||||
|
||||
PayoutInterval::Fixed { values }
|
||||
} else if let Some(first) = product.skus.first() {
|
||||
PayoutInterval::Standard {
|
||||
min: first.min,
|
||||
max: first.max,
|
||||
}
|
||||
} else {
|
||||
PayoutInterval::Standard {
|
||||
min: Decimal::ZERO,
|
||||
max: Decimal::from(5_000),
|
||||
}
|
||||
},
|
||||
fee: if product.category == "ach" {
|
||||
PayoutMethodFee {
|
||||
percentage: Decimal::from(4) / Decimal::from(100),
|
||||
min: Decimal::from(1) / Decimal::from(4),
|
||||
max: None,
|
||||
}
|
||||
} else {
|
||||
PayoutMethodFee {
|
||||
percentage: Default::default(),
|
||||
min: Default::default(),
|
||||
max: None,
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
// we do not support interval gift cards with non US based currencies since we cannot do currency conversions properly
|
||||
if let PayoutInterval::Fixed { .. } = method.interval {
|
||||
if !product.currency_codes.contains(&"USD".to_string()) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
methods.push(method);
|
||||
}
|
||||
|
||||
const UPRANK_IDS: &[&str] = &["ET0ZVETV5ILN", "Q24BD9EZ332JT", "UIL1ZYJU5MKN"];
|
||||
const DOWNRANK_IDS: &[&str] = &["EIPF8Q00EMM1", "OU2MWXYWPNWQ"];
|
||||
|
||||
methods.sort_by(|a, b| {
|
||||
let a_top = UPRANK_IDS.contains(&&*a.id);
|
||||
let a_bottom = DOWNRANK_IDS.contains(&&*a.id);
|
||||
let b_top = UPRANK_IDS.contains(&&*b.id);
|
||||
let b_bottom = DOWNRANK_IDS.contains(&&*b.id);
|
||||
|
||||
match (a_top, a_bottom, b_top, b_bottom) {
|
||||
(true, _, true, _) => a.name.cmp(&b.name), // Both in top_priority: sort alphabetically
|
||||
(_, true, _, true) => a.name.cmp(&b.name), // Both in bottom_priority: sort alphabetically
|
||||
(true, _, _, _) => std::cmp::Ordering::Less, // a in top_priority: a comes first
|
||||
(_, _, true, _) => std::cmp::Ordering::Greater, // b in top_priority: b comes first
|
||||
(_, true, _, _) => std::cmp::Ordering::Greater, // a in bottom_priority: b comes first
|
||||
(_, _, _, true) => std::cmp::Ordering::Less, // b in bottom_priority: a comes first
|
||||
(_, _, _, _) => a.name.cmp(&b.name), // Neither in priority: sort alphabetically
|
||||
}
|
||||
});
|
||||
|
||||
{
|
||||
let paypal_us = PayoutMethod {
|
||||
id: "paypal_us".to_string(),
|
||||
type_: PayoutMethodType::PayPal,
|
||||
name: "PayPal".to_string(),
|
||||
supported_countries: vec!["US".to_string()],
|
||||
image_url: None,
|
||||
interval: PayoutInterval::Standard {
|
||||
min: Decimal::from(1) / Decimal::from(4),
|
||||
max: Decimal::from(100_000),
|
||||
},
|
||||
fee: PayoutMethodFee {
|
||||
percentage: Decimal::from(2) / Decimal::from(100),
|
||||
min: Decimal::from(1) / Decimal::from(4),
|
||||
max: Some(Decimal::from(1)),
|
||||
},
|
||||
};
|
||||
|
||||
let mut venmo = paypal_us.clone();
|
||||
venmo.id = "venmo".to_string();
|
||||
venmo.name = "Venmo".to_string();
|
||||
venmo.type_ = PayoutMethodType::Venmo;
|
||||
|
||||
methods.insert(0, paypal_us);
|
||||
methods.insert(1, venmo)
|
||||
}
|
||||
|
||||
methods.insert(
|
||||
2,
|
||||
PayoutMethod {
|
||||
id: "paypal_in".to_string(),
|
||||
type_: PayoutMethodType::PayPal,
|
||||
name: "PayPal".to_string(),
|
||||
supported_countries: rust_iso3166::ALL
|
||||
.iter()
|
||||
.filter(|x| x.alpha2 != "US")
|
||||
.map(|x| x.alpha2.to_string())
|
||||
.collect(),
|
||||
image_url: None,
|
||||
interval: PayoutInterval::Standard {
|
||||
min: Decimal::from(1) / Decimal::from(4),
|
||||
max: Decimal::from(100_000),
|
||||
},
|
||||
fee: PayoutMethodFee {
|
||||
percentage: Decimal::from(2) / Decimal::from(100),
|
||||
min: Decimal::ZERO,
|
||||
max: Some(Decimal::from(20)),
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
let new_options = PayoutMethods {
|
||||
options: methods,
|
||||
expires: Utc::now() + Duration::hours(6),
|
||||
};
|
||||
|
||||
*options = Some(new_options.clone());
|
||||
|
||||
Ok(new_options)
|
||||
}
|
||||
|
||||
let read = self.payout_options.read().await;
|
||||
let options = if let Some(options) = read.as_ref() {
|
||||
if options.expires < Utc::now() {
|
||||
drop(read);
|
||||
refresh_payout_methods(self).await?
|
||||
} else {
|
||||
options.clone()
|
||||
}
|
||||
} else {
|
||||
drop(read);
|
||||
refresh_payout_methods(self).await?
|
||||
};
|
||||
|
||||
Ok(options.options)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct AditudePoints {
|
||||
#[serde(rename = "pointsList")]
|
||||
pub points_list: Vec<AditudePoint>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct AditudePoint {
|
||||
pub metric: AditudeMetric,
|
||||
pub time: AditudeTime,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct AditudeMetric {
|
||||
pub revenue: Option<Decimal>,
|
||||
pub impressions: Option<u128>,
|
||||
pub cpm: Option<Decimal>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct AditudeTime {
|
||||
pub seconds: u64,
|
||||
}
|
||||
|
||||
pub async fn make_aditude_request(
|
||||
metrics: &[&str],
|
||||
range: &str,
|
||||
interval: &str,
|
||||
) -> Result<Vec<AditudePoints>, ApiError> {
|
||||
let request = reqwest::Client::new()
|
||||
.post("https://cloud.aditude.io/api/public/insights/metrics")
|
||||
.bearer_auth(&dotenvy::var("ADITUDE_API_KEY")?)
|
||||
.json(&serde_json::json!({
|
||||
"metrics": metrics,
|
||||
"range": range,
|
||||
"interval": interval
|
||||
}))
|
||||
.send()
|
||||
.await?
|
||||
.error_for_status()?;
|
||||
|
||||
let text = request.text().await?;
|
||||
|
||||
let json: Vec<AditudePoints> = serde_json::from_str(&text)?;
|
||||
|
||||
Ok(json)
|
||||
}
|
||||
|
||||
pub async fn process_payout(pool: &PgPool, client: &clickhouse::Client) -> Result<(), ApiError> {
|
||||
let start: DateTime<Utc> = DateTime::from_naive_utc_and_offset(
|
||||
(Utc::now() - Duration::days(1))
|
||||
.date_naive()
|
||||
.and_hms_nano_opt(0, 0, 0, 0)
|
||||
.unwrap_or_default(),
|
||||
Utc,
|
||||
);
|
||||
|
||||
let results = sqlx::query!(
|
||||
"SELECT EXISTS(SELECT 1 FROM payouts_values WHERE created = $1)",
|
||||
start,
|
||||
)
|
||||
.fetch_one(pool)
|
||||
.await?;
|
||||
|
||||
if results.exists.unwrap_or(false) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let end = start + Duration::days(1);
|
||||
#[derive(Deserialize, clickhouse::Row)]
|
||||
struct ProjectMultiplier {
|
||||
pub page_views: u64,
|
||||
pub project_id: u64,
|
||||
}
|
||||
|
||||
let (views_values, views_sum, downloads_values, downloads_sum) = futures::future::try_join4(
|
||||
client
|
||||
.query(
|
||||
r#"
|
||||
SELECT COUNT(1) page_views, project_id
|
||||
FROM views
|
||||
WHERE (recorded BETWEEN ? AND ?) AND (project_id != 0) AND (monetized = TRUE)
|
||||
GROUP BY project_id
|
||||
ORDER BY page_views DESC
|
||||
"#,
|
||||
)
|
||||
.bind(start.timestamp())
|
||||
.bind(end.timestamp())
|
||||
.fetch_all::<ProjectMultiplier>(),
|
||||
client
|
||||
.query("SELECT COUNT(1) FROM views WHERE (recorded BETWEEN ? AND ?) AND (project_id != 0) AND (monetized = TRUE)")
|
||||
.bind(start.timestamp())
|
||||
.bind(end.timestamp())
|
||||
.fetch_one::<u64>(),
|
||||
client
|
||||
.query(
|
||||
r#"
|
||||
SELECT COUNT(1) page_views, project_id
|
||||
FROM downloads
|
||||
WHERE (recorded BETWEEN ? AND ?) AND (user_id != 0)
|
||||
GROUP BY project_id
|
||||
ORDER BY page_views DESC
|
||||
"#,
|
||||
)
|
||||
.bind(start.timestamp())
|
||||
.bind(end.timestamp())
|
||||
.fetch_all::<ProjectMultiplier>(),
|
||||
client
|
||||
.query("SELECT COUNT(1) FROM downloads WHERE (recorded BETWEEN ? AND ?) AND (user_id != 0)")
|
||||
.bind(start.timestamp())
|
||||
.bind(end.timestamp())
|
||||
.fetch_one::<u64>(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut transaction = pool.begin().await?;
|
||||
|
||||
struct PayoutMultipliers {
|
||||
sum: u64,
|
||||
values: HashMap<u64, u64>,
|
||||
}
|
||||
|
||||
let mut views_values = views_values
|
||||
.into_iter()
|
||||
.map(|x| (x.project_id, x.page_views))
|
||||
.collect::<HashMap<u64, u64>>();
|
||||
let downloads_values = downloads_values
|
||||
.into_iter()
|
||||
.map(|x| (x.project_id, x.page_views))
|
||||
.collect::<HashMap<u64, u64>>();
|
||||
|
||||
for (key, value) in downloads_values.iter() {
|
||||
let counter = views_values.entry(*key).or_insert(0);
|
||||
*counter += *value;
|
||||
}
|
||||
|
||||
let multipliers: PayoutMultipliers = PayoutMultipliers {
|
||||
sum: downloads_sum + views_sum,
|
||||
values: views_values,
|
||||
};
|
||||
|
||||
struct Project {
|
||||
// user_id, payouts_split
|
||||
team_members: Vec<(i64, Decimal)>,
|
||||
}
|
||||
|
||||
let mut projects_map: HashMap<i64, Project> = HashMap::new();
|
||||
|
||||
let project_ids = multipliers
|
||||
.values
|
||||
.keys()
|
||||
.map(|x| *x as i64)
|
||||
.collect::<Vec<i64>>();
|
||||
|
||||
let project_org_members = sqlx::query!(
|
||||
"
|
||||
SELECT m.id id, tm.user_id user_id, tm.payouts_split payouts_split
|
||||
FROM mods m
|
||||
INNER JOIN organizations o ON m.organization_id = o.id
|
||||
INNER JOIN team_members tm on o.team_id = tm.team_id AND tm.accepted = TRUE
|
||||
WHERE m.id = ANY($1) AND m.monetization_status = $2 AND m.status = ANY($3) AND m.organization_id IS NOT NULL
|
||||
",
|
||||
&project_ids,
|
||||
MonetizationStatus::Monetized.as_str(),
|
||||
&*crate::models::projects::ProjectStatus::iterator()
|
||||
.filter(|x| !x.is_hidden())
|
||||
.map(|x| x.to_string())
|
||||
.collect::<Vec<String>>(),
|
||||
)
|
||||
.fetch(&mut *transaction)
|
||||
.try_fold(DashMap::new(), |acc: DashMap<i64, HashMap<i64, Decimal>>, r| {
|
||||
acc.entry(r.id)
|
||||
.or_default()
|
||||
.insert(r.user_id, r.payouts_split);
|
||||
async move { Ok(acc) }
|
||||
})
|
||||
.await?;
|
||||
|
||||
let project_team_members = sqlx::query!(
|
||||
"
|
||||
SELECT m.id id, tm.user_id user_id, tm.payouts_split payouts_split
|
||||
FROM mods m
|
||||
INNER JOIN team_members tm on m.team_id = tm.team_id AND tm.accepted = TRUE
|
||||
WHERE m.id = ANY($1) AND m.monetization_status = $2 AND m.status = ANY($3)
|
||||
",
|
||||
&project_ids,
|
||||
MonetizationStatus::Monetized.as_str(),
|
||||
&*crate::models::projects::ProjectStatus::iterator()
|
||||
.filter(|x| !x.is_hidden())
|
||||
.map(|x| x.to_string())
|
||||
.collect::<Vec<String>>(),
|
||||
)
|
||||
.fetch(&mut *transaction)
|
||||
.try_fold(
|
||||
DashMap::new(),
|
||||
|acc: DashMap<i64, HashMap<i64, Decimal>>, r| {
|
||||
acc.entry(r.id)
|
||||
.or_default()
|
||||
.insert(r.user_id, r.payouts_split);
|
||||
async move { Ok(acc) }
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
for project_id in project_ids {
|
||||
let team_members: HashMap<i64, Decimal> = project_team_members
|
||||
.remove(&project_id)
|
||||
.unwrap_or((0, HashMap::new()))
|
||||
.1;
|
||||
let org_team_members: HashMap<i64, Decimal> = project_org_members
|
||||
.remove(&project_id)
|
||||
.unwrap_or((0, HashMap::new()))
|
||||
.1;
|
||||
|
||||
let mut all_team_members = vec![];
|
||||
|
||||
for (user_id, payouts_split) in org_team_members {
|
||||
if !team_members.contains_key(&user_id) {
|
||||
all_team_members.push((user_id, payouts_split));
|
||||
}
|
||||
}
|
||||
for (user_id, payouts_split) in team_members {
|
||||
all_team_members.push((user_id, payouts_split));
|
||||
}
|
||||
|
||||
// if all team members are set to zero, we treat as an equal revenue distribution
|
||||
if all_team_members.iter().all(|x| x.1 == Decimal::ZERO) {
|
||||
all_team_members
|
||||
.iter_mut()
|
||||
.for_each(|x| x.1 = Decimal::from(1));
|
||||
}
|
||||
|
||||
projects_map.insert(
|
||||
project_id,
|
||||
Project {
|
||||
team_members: all_team_members,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
let aditude_res =
|
||||
make_aditude_request(&["METRIC_IMPRESSIONS", "METRIC_REVENUE"], "Yesterday", "1d").await?;
|
||||
|
||||
let aditude_amount: Decimal = aditude_res
|
||||
.iter()
|
||||
.map(|x| {
|
||||
x.points_list
|
||||
.iter()
|
||||
.filter_map(|x| x.metric.revenue)
|
||||
.sum::<Decimal>()
|
||||
})
|
||||
.sum();
|
||||
let aditude_impressions: u128 = aditude_res
|
||||
.iter()
|
||||
.map(|x| {
|
||||
x.points_list
|
||||
.iter()
|
||||
.filter_map(|x| x.metric.impressions)
|
||||
.sum::<u128>()
|
||||
})
|
||||
.sum();
|
||||
|
||||
// Modrinth's share of ad revenue
|
||||
let modrinth_cut = Decimal::from(1) / Decimal::from(4);
|
||||
// Clean.io fee (ad antimalware). Per 1000 impressions.
|
||||
let clean_io_fee = Decimal::from(8) / Decimal::from(1000);
|
||||
|
||||
let net_revenue =
|
||||
aditude_amount - (clean_io_fee * Decimal::from(aditude_impressions) / Decimal::from(1000));
|
||||
|
||||
let payout = net_revenue * (Decimal::from(1) - modrinth_cut);
|
||||
|
||||
// Ad payouts are Net 60 from the end of the month
|
||||
let available = {
|
||||
let now = Utc::now().date_naive();
|
||||
|
||||
let year = now.year();
|
||||
let month = now.month();
|
||||
|
||||
// Get the first day of the next month
|
||||
let last_day_of_month = if month == 12 {
|
||||
Utc.with_ymd_and_hms(year + 1, 1, 1, 0, 0, 0).unwrap()
|
||||
} else {
|
||||
Utc.with_ymd_and_hms(year, month + 1, 1, 0, 0, 0).unwrap()
|
||||
};
|
||||
|
||||
last_day_of_month + Duration::days(59)
|
||||
};
|
||||
|
||||
let (
|
||||
mut insert_user_ids,
|
||||
mut insert_project_ids,
|
||||
mut insert_payouts,
|
||||
mut insert_starts,
|
||||
mut insert_availables,
|
||||
) = (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new());
|
||||
for (id, project) in projects_map {
|
||||
if let Some(value) = &multipliers.values.get(&(id as u64)) {
|
||||
let project_multiplier: Decimal =
|
||||
Decimal::from(**value) / Decimal::from(multipliers.sum);
|
||||
|
||||
let sum_splits: Decimal = project.team_members.iter().map(|x| x.1).sum();
|
||||
|
||||
if sum_splits > Decimal::ZERO {
|
||||
for (user_id, split) in project.team_members {
|
||||
let payout: Decimal = payout * project_multiplier * (split / sum_splits);
|
||||
|
||||
if payout > Decimal::ZERO {
|
||||
insert_user_ids.push(user_id);
|
||||
insert_project_ids.push(id);
|
||||
insert_payouts.push(payout);
|
||||
insert_starts.push(start);
|
||||
insert_availables.push(available);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sqlx::query!(
|
||||
"
|
||||
INSERT INTO payouts_values (user_id, mod_id, amount, created, date_available)
|
||||
SELECT * FROM UNNEST ($1::bigint[], $2::bigint[], $3::numeric[], $4::timestamptz[], $5::timestamptz[])
|
||||
",
|
||||
&insert_user_ids[..],
|
||||
&insert_project_ids[..],
|
||||
&insert_payouts[..],
|
||||
&insert_starts[..],
|
||||
&insert_availables[..]
|
||||
)
|
||||
.execute(&mut *transaction)
|
||||
.await?;
|
||||
|
||||
transaction.commit().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Used for testing, should be the same as the above function
|
||||
pub async fn insert_payouts(
|
||||
insert_user_ids: Vec<i64>,
|
||||
insert_project_ids: Vec<i64>,
|
||||
insert_payouts: Vec<Decimal>,
|
||||
insert_starts: Vec<DateTime<Utc>>,
|
||||
insert_availables: Vec<DateTime<Utc>>,
|
||||
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
|
||||
) -> sqlx::Result<PgQueryResult> {
|
||||
sqlx::query!(
|
||||
"
|
||||
INSERT INTO payouts_values (user_id, mod_id, amount, created, date_available)
|
||||
SELECT * FROM UNNEST ($1::bigint[], $2::bigint[], $3::numeric[], $4::timestamptz[], $5::timestamptz[])
|
||||
",
|
||||
&insert_user_ids[..],
|
||||
&insert_project_ids[..],
|
||||
&insert_payouts[..],
|
||||
&insert_starts[..],
|
||||
&insert_availables[..],
|
||||
)
|
||||
.execute(&mut **transaction)
|
||||
.await
|
||||
}
|
||||
159
apps/labrinth/src/queue/session.rs
Normal file
159
apps/labrinth/src/queue/session.rs
Normal file
@@ -0,0 +1,159 @@
|
||||
use crate::database::models::pat_item::PersonalAccessToken;
|
||||
use crate::database::models::session_item::Session;
|
||||
use crate::database::models::{DatabaseError, OAuthAccessTokenId, PatId, SessionId, UserId};
|
||||
use crate::database::redis::RedisPool;
|
||||
use crate::routes::internal::session::SessionMetadata;
|
||||
use chrono::Utc;
|
||||
use itertools::Itertools;
|
||||
use sqlx::PgPool;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
pub struct AuthQueue {
|
||||
session_queue: Mutex<HashMap<SessionId, SessionMetadata>>,
|
||||
pat_queue: Mutex<HashSet<PatId>>,
|
||||
oauth_access_token_queue: Mutex<HashSet<OAuthAccessTokenId>>,
|
||||
}
|
||||
|
||||
impl Default for AuthQueue {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
// Batches session accessing transactions every 30 seconds
|
||||
impl AuthQueue {
|
||||
pub fn new() -> Self {
|
||||
AuthQueue {
|
||||
session_queue: Mutex::new(HashMap::with_capacity(1000)),
|
||||
pat_queue: Mutex::new(HashSet::with_capacity(1000)),
|
||||
oauth_access_token_queue: Mutex::new(HashSet::with_capacity(1000)),
|
||||
}
|
||||
}
|
||||
pub async fn add_session(&self, id: SessionId, metadata: SessionMetadata) {
|
||||
self.session_queue.lock().await.insert(id, metadata);
|
||||
}
|
||||
|
||||
pub async fn add_pat(&self, id: PatId) {
|
||||
self.pat_queue.lock().await.insert(id);
|
||||
}
|
||||
|
||||
pub async fn add_oauth_access_token(&self, id: crate::database::models::OAuthAccessTokenId) {
|
||||
self.oauth_access_token_queue.lock().await.insert(id);
|
||||
}
|
||||
|
||||
pub async fn take_sessions(&self) -> HashMap<SessionId, SessionMetadata> {
|
||||
let mut queue = self.session_queue.lock().await;
|
||||
let len = queue.len();
|
||||
|
||||
std::mem::replace(&mut queue, HashMap::with_capacity(len))
|
||||
}
|
||||
|
||||
pub async fn take_hashset<T>(queue: &Mutex<HashSet<T>>) -> HashSet<T> {
|
||||
let mut queue = queue.lock().await;
|
||||
let len = queue.len();
|
||||
|
||||
std::mem::replace(&mut queue, HashSet::with_capacity(len))
|
||||
}
|
||||
|
||||
pub async fn index(&self, pool: &PgPool, redis: &RedisPool) -> Result<(), DatabaseError> {
|
||||
let session_queue = self.take_sessions().await;
|
||||
let pat_queue = Self::take_hashset(&self.pat_queue).await;
|
||||
let oauth_access_token_queue = Self::take_hashset(&self.oauth_access_token_queue).await;
|
||||
|
||||
if !session_queue.is_empty()
|
||||
|| !pat_queue.is_empty()
|
||||
|| !oauth_access_token_queue.is_empty()
|
||||
{
|
||||
let mut transaction = pool.begin().await?;
|
||||
let mut clear_cache_sessions = Vec::new();
|
||||
|
||||
for (id, metadata) in session_queue {
|
||||
clear_cache_sessions.push((Some(id), None, None));
|
||||
|
||||
sqlx::query!(
|
||||
"
|
||||
UPDATE sessions
|
||||
SET last_login = $2, city = $3, country = $4, ip = $5, os = $6, platform = $7, user_agent = $8
|
||||
WHERE (id = $1)
|
||||
",
|
||||
id as SessionId,
|
||||
Utc::now(),
|
||||
metadata.city,
|
||||
metadata.country,
|
||||
metadata.ip,
|
||||
metadata.os,
|
||||
metadata.platform,
|
||||
metadata.user_agent,
|
||||
)
|
||||
.execute(&mut *transaction)
|
||||
.await?;
|
||||
}
|
||||
|
||||
use futures::TryStreamExt;
|
||||
let expired_ids = sqlx::query!(
|
||||
"
|
||||
SELECT id, session, user_id
|
||||
FROM sessions
|
||||
WHERE refresh_expires <= NOW()
|
||||
"
|
||||
)
|
||||
.fetch(&mut *transaction)
|
||||
.map_ok(|x| (SessionId(x.id), x.session, UserId(x.user_id)))
|
||||
.try_collect::<Vec<(SessionId, String, UserId)>>()
|
||||
.await?;
|
||||
|
||||
for (id, session, user_id) in expired_ids {
|
||||
clear_cache_sessions.push((Some(id), Some(session), Some(user_id)));
|
||||
Session::remove(id, &mut transaction).await?;
|
||||
}
|
||||
|
||||
Session::clear_cache(clear_cache_sessions, redis).await?;
|
||||
|
||||
let ids = pat_queue.iter().map(|id| id.0).collect_vec();
|
||||
let clear_cache_pats = pat_queue
|
||||
.into_iter()
|
||||
.map(|id| (Some(id), None, None))
|
||||
.collect_vec();
|
||||
sqlx::query!(
|
||||
"
|
||||
UPDATE pats
|
||||
SET last_used = $2
|
||||
WHERE id IN
|
||||
(SELECT * FROM UNNEST($1::bigint[]))
|
||||
",
|
||||
&ids[..],
|
||||
Utc::now(),
|
||||
)
|
||||
.execute(&mut *transaction)
|
||||
.await?;
|
||||
|
||||
update_oauth_access_token_last_used(oauth_access_token_queue, &mut transaction).await?;
|
||||
|
||||
transaction.commit().await?;
|
||||
PersonalAccessToken::clear_cache(clear_cache_pats, redis).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn update_oauth_access_token_last_used(
|
||||
oauth_access_token_queue: HashSet<OAuthAccessTokenId>,
|
||||
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
|
||||
) -> Result<(), DatabaseError> {
|
||||
let ids = oauth_access_token_queue.iter().map(|id| id.0).collect_vec();
|
||||
sqlx::query!(
|
||||
"
|
||||
UPDATE oauth_access_tokens
|
||||
SET last_used = $2
|
||||
WHERE id IN
|
||||
(SELECT * FROM UNNEST($1::bigint[]))
|
||||
",
|
||||
&ids[..],
|
||||
Utc::now()
|
||||
)
|
||||
.execute(&mut **transaction)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
15
apps/labrinth/src/queue/socket.rs
Normal file
15
apps/labrinth/src/queue/socket.rs
Normal file
@@ -0,0 +1,15 @@
|
||||
//! "Database" for Hydra
|
||||
use actix_ws::Session;
|
||||
use dashmap::DashMap;
|
||||
|
||||
pub struct ActiveSockets {
|
||||
pub auth_sockets: DashMap<String, Session>,
|
||||
}
|
||||
|
||||
impl Default for ActiveSockets {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
auth_sockets: DashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user